- patches.apparmor/remove_suid_new_case_in_2.6.22.diff: Merge fix.
[linux-flexiantxendom0-3.2.10.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86                                     struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
89
90 /*
91  * Lock compatibilty matrix - thanks Steve
92  * UN = Unlocked state. Not really a state, used as a flag
93  * PD = Padding. Used to make the matrix a nice power of two in size
94  * Other states are the same as the VMS DLM.
95  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
96  */
97
98 static const int __dlm_compat_matrix[8][8] = {
99       /* UN NL CR CW PR PW EX PD */
100         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
101         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
102         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
103         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
104         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
105         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
106         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
107         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
108 };
109
110 /*
111  * This defines the direction of transfer of LVB data.
112  * Granted mode is the row; requested mode is the column.
113  * Usage: matrix[grmode+1][rqmode+1]
114  * 1 = LVB is returned to the caller
115  * 0 = LVB is written to the resource
116  * -1 = nothing happens to the LVB
117  */
118
119 const int dlm_lvb_operations[8][8] = {
120         /* UN   NL  CR  CW  PR  PW  EX  PD*/
121         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
122         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
123         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
124         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
125         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
126         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
127         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
128         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
129 };
130
131 #define modes_compat(gr, rq) \
132         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133
134 int dlm_modes_compat(int mode1, int mode2)
135 {
136         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
137 }
138
139 /*
140  * Compatibility matrix for conversions with QUECVT set.
141  * Granted mode is the row; requested mode is the column.
142  * Usage: matrix[grmode+1][rqmode+1]
143  */
144
145 static const int __quecvt_compat_matrix[8][8] = {
146       /* UN NL CR CW PR PW EX PD */
147         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
148         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
149         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
150         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
151         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
152         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
153         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
154         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
155 };
156
157 void dlm_print_lkb(struct dlm_lkb *lkb)
158 {
159         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
164 }
165
166 void dlm_print_rsb(struct dlm_rsb *r)
167 {
168         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169                r->res_nodeid, r->res_flags, r->res_first_lkid,
170                r->res_recover_locks_count, r->res_name);
171 }
172
173 void dlm_dump_rsb(struct dlm_rsb *r)
174 {
175         struct dlm_lkb *lkb;
176
177         dlm_print_rsb(r);
178
179         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181         printk(KERN_ERR "rsb lookup list\n");
182         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183                 dlm_print_lkb(lkb);
184         printk(KERN_ERR "rsb grant queue:\n");
185         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186                 dlm_print_lkb(lkb);
187         printk(KERN_ERR "rsb convert queue:\n");
188         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189                 dlm_print_lkb(lkb);
190         printk(KERN_ERR "rsb wait queue:\n");
191         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192                 dlm_print_lkb(lkb);
193 }
194
195 /* Threads cannot use the lockspace while it's being recovered */
196
197 static inline void lock_recovery(struct dlm_ls *ls)
198 {
199         down_read(&ls->ls_in_recovery);
200 }
201
202 static inline void unlock_recovery(struct dlm_ls *ls)
203 {
204         up_read(&ls->ls_in_recovery);
205 }
206
207 static inline int lock_recovery_try(struct dlm_ls *ls)
208 {
209         return down_read_trylock(&ls->ls_in_recovery);
210 }
211
212 static inline int can_be_queued(struct dlm_lkb *lkb)
213 {
214         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
215 }
216
217 static inline int force_blocking_asts(struct dlm_lkb *lkb)
218 {
219         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
220 }
221
222 static inline int is_demoted(struct dlm_lkb *lkb)
223 {
224         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
225 }
226
227 static inline int is_altmode(struct dlm_lkb *lkb)
228 {
229         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
230 }
231
232 static inline int is_granted(struct dlm_lkb *lkb)
233 {
234         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
235 }
236
237 static inline int is_remote(struct dlm_rsb *r)
238 {
239         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
240         return !!r->res_nodeid;
241 }
242
243 static inline int is_process_copy(struct dlm_lkb *lkb)
244 {
245         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
246 }
247
248 static inline int is_master_copy(struct dlm_lkb *lkb)
249 {
250         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
251                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
252         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
253 }
254
255 static inline int middle_conversion(struct dlm_lkb *lkb)
256 {
257         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
258             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
259                 return 1;
260         return 0;
261 }
262
263 static inline int down_conversion(struct dlm_lkb *lkb)
264 {
265         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
266 }
267
268 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
269 {
270         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
271 }
272
273 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
274 {
275         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
276 }
277
278 static inline int is_overlap(struct dlm_lkb *lkb)
279 {
280         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
281                                   DLM_IFL_OVERLAP_CANCEL));
282 }
283
284 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
285 {
286         if (is_master_copy(lkb))
287                 return;
288
289         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
290
291         lkb->lkb_lksb->sb_status = rv;
292         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
293
294         dlm_add_ast(lkb, AST_COMP);
295 }
296
297 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
298 {
299         queue_cast(r, lkb,
300                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
301 }
302
303 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
304 {
305         if (is_master_copy(lkb))
306                 send_bast(r, lkb, rqmode);
307         else {
308                 lkb->lkb_bastmode = rqmode;
309                 dlm_add_ast(lkb, AST_BAST);
310         }
311 }
312
313 /*
314  * Basic operations on rsb's and lkb's
315  */
316
317 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
318 {
319         struct dlm_rsb *r;
320
321         r = allocate_rsb(ls, len);
322         if (!r)
323                 return NULL;
324
325         r->res_ls = ls;
326         r->res_length = len;
327         memcpy(r->res_name, name, len);
328         mutex_init(&r->res_mutex);
329
330         INIT_LIST_HEAD(&r->res_lookup);
331         INIT_LIST_HEAD(&r->res_grantqueue);
332         INIT_LIST_HEAD(&r->res_convertqueue);
333         INIT_LIST_HEAD(&r->res_waitqueue);
334         INIT_LIST_HEAD(&r->res_root_list);
335         INIT_LIST_HEAD(&r->res_recover_list);
336
337         return r;
338 }
339
340 static int search_rsb_list(struct list_head *head, char *name, int len,
341                            unsigned int flags, struct dlm_rsb **r_ret)
342 {
343         struct dlm_rsb *r;
344         int error = 0;
345
346         list_for_each_entry(r, head, res_hashchain) {
347                 if (len == r->res_length && !memcmp(name, r->res_name, len))
348                         goto found;
349         }
350         return -EBADR;
351
352  found:
353         if (r->res_nodeid && (flags & R_MASTER))
354                 error = -ENOTBLK;
355         *r_ret = r;
356         return error;
357 }
358
359 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
360                        unsigned int flags, struct dlm_rsb **r_ret)
361 {
362         struct dlm_rsb *r;
363         int error;
364
365         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
366         if (!error) {
367                 kref_get(&r->res_ref);
368                 goto out;
369         }
370         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
371         if (error)
372                 goto out;
373
374         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
375
376         if (dlm_no_directory(ls))
377                 goto out;
378
379         if (r->res_nodeid == -1) {
380                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
381                 r->res_first_lkid = 0;
382         } else if (r->res_nodeid > 0) {
383                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
384                 r->res_first_lkid = 0;
385         } else {
386                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
387                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
388         }
389  out:
390         *r_ret = r;
391         return error;
392 }
393
394 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
395                       unsigned int flags, struct dlm_rsb **r_ret)
396 {
397         int error;
398         write_lock(&ls->ls_rsbtbl[b].lock);
399         error = _search_rsb(ls, name, len, b, flags, r_ret);
400         write_unlock(&ls->ls_rsbtbl[b].lock);
401         return error;
402 }
403
404 /*
405  * Find rsb in rsbtbl and potentially create/add one
406  *
407  * Delaying the release of rsb's has a similar benefit to applications keeping
408  * NL locks on an rsb, but without the guarantee that the cached master value
409  * will still be valid when the rsb is reused.  Apps aren't always smart enough
410  * to keep NL locks on an rsb that they may lock again shortly; this can lead
411  * to excessive master lookups and removals if we don't delay the release.
412  *
413  * Searching for an rsb means looking through both the normal list and toss
414  * list.  When found on the toss list the rsb is moved to the normal list with
415  * ref count of 1; when found on normal list the ref count is incremented.
416  */
417
418 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
419                     unsigned int flags, struct dlm_rsb **r_ret)
420 {
421         struct dlm_rsb *r, *tmp;
422         uint32_t hash, bucket;
423         int error = 0;
424
425         if (dlm_no_directory(ls))
426                 flags |= R_CREATE;
427
428         hash = jhash(name, namelen, 0);
429         bucket = hash & (ls->ls_rsbtbl_size - 1);
430
431         error = search_rsb(ls, name, namelen, bucket, flags, &r);
432         if (!error)
433                 goto out;
434
435         if (error == -EBADR && !(flags & R_CREATE))
436                 goto out;
437
438         /* the rsb was found but wasn't a master copy */
439         if (error == -ENOTBLK)
440                 goto out;
441
442         error = -ENOMEM;
443         r = create_rsb(ls, name, namelen);
444         if (!r)
445                 goto out;
446
447         r->res_hash = hash;
448         r->res_bucket = bucket;
449         r->res_nodeid = -1;
450         kref_init(&r->res_ref);
451
452         /* With no directory, the master can be set immediately */
453         if (dlm_no_directory(ls)) {
454                 int nodeid = dlm_dir_nodeid(r);
455                 if (nodeid == dlm_our_nodeid())
456                         nodeid = 0;
457                 r->res_nodeid = nodeid;
458         }
459
460         write_lock(&ls->ls_rsbtbl[bucket].lock);
461         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
462         if (!error) {
463                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
464                 free_rsb(r);
465                 r = tmp;
466                 goto out;
467         }
468         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
469         write_unlock(&ls->ls_rsbtbl[bucket].lock);
470         error = 0;
471  out:
472         *r_ret = r;
473         return error;
474 }
475
476 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
477                  unsigned int flags, struct dlm_rsb **r_ret)
478 {
479         return find_rsb(ls, name, namelen, flags, r_ret);
480 }
481
482 /* This is only called to add a reference when the code already holds
483    a valid reference to the rsb, so there's no need for locking. */
484
485 static inline void hold_rsb(struct dlm_rsb *r)
486 {
487         kref_get(&r->res_ref);
488 }
489
490 void dlm_hold_rsb(struct dlm_rsb *r)
491 {
492         hold_rsb(r);
493 }
494
495 static void toss_rsb(struct kref *kref)
496 {
497         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
498         struct dlm_ls *ls = r->res_ls;
499
500         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
501         kref_init(&r->res_ref);
502         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
503         r->res_toss_time = jiffies;
504         if (r->res_lvbptr) {
505                 free_lvb(r->res_lvbptr);
506                 r->res_lvbptr = NULL;
507         }
508 }
509
510 /* When all references to the rsb are gone it's transfered to
511    the tossed list for later disposal. */
512
513 static void put_rsb(struct dlm_rsb *r)
514 {
515         struct dlm_ls *ls = r->res_ls;
516         uint32_t bucket = r->res_bucket;
517
518         write_lock(&ls->ls_rsbtbl[bucket].lock);
519         kref_put(&r->res_ref, toss_rsb);
520         write_unlock(&ls->ls_rsbtbl[bucket].lock);
521 }
522
523 void dlm_put_rsb(struct dlm_rsb *r)
524 {
525         put_rsb(r);
526 }
527
528 /* See comment for unhold_lkb */
529
530 static void unhold_rsb(struct dlm_rsb *r)
531 {
532         int rv;
533         rv = kref_put(&r->res_ref, toss_rsb);
534         DLM_ASSERT(!rv, dlm_dump_rsb(r););
535 }
536
537 static void kill_rsb(struct kref *kref)
538 {
539         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
540
541         /* All work is done after the return from kref_put() so we
542            can release the write_lock before the remove and free. */
543
544         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
545         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
546         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
547         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
548         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
549         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
550 }
551
552 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
553    The rsb must exist as long as any lkb's for it do. */
554
555 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
556 {
557         hold_rsb(r);
558         lkb->lkb_resource = r;
559 }
560
561 static void detach_lkb(struct dlm_lkb *lkb)
562 {
563         if (lkb->lkb_resource) {
564                 put_rsb(lkb->lkb_resource);
565                 lkb->lkb_resource = NULL;
566         }
567 }
568
569 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
570 {
571         struct dlm_lkb *lkb, *tmp;
572         uint32_t lkid = 0;
573         uint16_t bucket;
574
575         lkb = allocate_lkb(ls);
576         if (!lkb)
577                 return -ENOMEM;
578
579         lkb->lkb_nodeid = -1;
580         lkb->lkb_grmode = DLM_LOCK_IV;
581         kref_init(&lkb->lkb_ref);
582         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
583         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
584
585         get_random_bytes(&bucket, sizeof(bucket));
586         bucket &= (ls->ls_lkbtbl_size - 1);
587
588         write_lock(&ls->ls_lkbtbl[bucket].lock);
589
590         /* counter can roll over so we must verify lkid is not in use */
591
592         while (lkid == 0) {
593                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
594
595                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
596                                     lkb_idtbl_list) {
597                         if (tmp->lkb_id != lkid)
598                                 continue;
599                         lkid = 0;
600                         break;
601                 }
602         }
603
604         lkb->lkb_id = lkid;
605         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
606         write_unlock(&ls->ls_lkbtbl[bucket].lock);
607
608         *lkb_ret = lkb;
609         return 0;
610 }
611
612 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
613 {
614         struct dlm_lkb *lkb;
615         uint16_t bucket = (lkid >> 16);
616
617         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
618                 if (lkb->lkb_id == lkid)
619                         return lkb;
620         }
621         return NULL;
622 }
623
624 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
625 {
626         struct dlm_lkb *lkb;
627         uint16_t bucket = (lkid >> 16);
628
629         if (bucket >= ls->ls_lkbtbl_size)
630                 return -EBADSLT;
631
632         read_lock(&ls->ls_lkbtbl[bucket].lock);
633         lkb = __find_lkb(ls, lkid);
634         if (lkb)
635                 kref_get(&lkb->lkb_ref);
636         read_unlock(&ls->ls_lkbtbl[bucket].lock);
637
638         *lkb_ret = lkb;
639         return lkb ? 0 : -ENOENT;
640 }
641
642 static void kill_lkb(struct kref *kref)
643 {
644         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
645
646         /* All work is done after the return from kref_put() so we
647            can release the write_lock before the detach_lkb */
648
649         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
650 }
651
652 /* __put_lkb() is used when an lkb may not have an rsb attached to
653    it so we need to provide the lockspace explicitly */
654
655 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
656 {
657         uint16_t bucket = (lkb->lkb_id >> 16);
658
659         write_lock(&ls->ls_lkbtbl[bucket].lock);
660         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
661                 list_del(&lkb->lkb_idtbl_list);
662                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
663
664                 detach_lkb(lkb);
665
666                 /* for local/process lkbs, lvbptr points to caller's lksb */
667                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
668                         free_lvb(lkb->lkb_lvbptr);
669                 free_lkb(lkb);
670                 return 1;
671         } else {
672                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
673                 return 0;
674         }
675 }
676
677 int dlm_put_lkb(struct dlm_lkb *lkb)
678 {
679         struct dlm_ls *ls;
680
681         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
682         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
683
684         ls = lkb->lkb_resource->res_ls;
685         return __put_lkb(ls, lkb);
686 }
687
688 /* This is only called to add a reference when the code already holds
689    a valid reference to the lkb, so there's no need for locking. */
690
691 static inline void hold_lkb(struct dlm_lkb *lkb)
692 {
693         kref_get(&lkb->lkb_ref);
694 }
695
696 /* This is called when we need to remove a reference and are certain
697    it's not the last ref.  e.g. del_lkb is always called between a
698    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
699    put_lkb would work fine, but would involve unnecessary locking */
700
701 static inline void unhold_lkb(struct dlm_lkb *lkb)
702 {
703         int rv;
704         rv = kref_put(&lkb->lkb_ref, kill_lkb);
705         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
706 }
707
708 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
709                             int mode)
710 {
711         struct dlm_lkb *lkb = NULL;
712
713         list_for_each_entry(lkb, head, lkb_statequeue)
714                 if (lkb->lkb_rqmode < mode)
715                         break;
716
717         if (!lkb)
718                 list_add_tail(new, head);
719         else
720                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
721 }
722
723 /* add/remove lkb to rsb's grant/convert/wait queue */
724
725 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
726 {
727         kref_get(&lkb->lkb_ref);
728
729         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
730
731         lkb->lkb_status = status;
732
733         switch (status) {
734         case DLM_LKSTS_WAITING:
735                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
736                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
737                 else
738                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
739                 break;
740         case DLM_LKSTS_GRANTED:
741                 /* convention says granted locks kept in order of grmode */
742                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
743                                 lkb->lkb_grmode);
744                 break;
745         case DLM_LKSTS_CONVERT:
746                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
748                 else
749                         list_add_tail(&lkb->lkb_statequeue,
750                                       &r->res_convertqueue);
751                 break;
752         default:
753                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
754         }
755 }
756
757 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
758 {
759         lkb->lkb_status = 0;
760         list_del(&lkb->lkb_statequeue);
761         unhold_lkb(lkb);
762 }
763
764 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
765 {
766         hold_lkb(lkb);
767         del_lkb(r, lkb);
768         add_lkb(r, lkb, sts);
769         unhold_lkb(lkb);
770 }
771
772 static int msg_reply_type(int mstype)
773 {
774         switch (mstype) {
775         case DLM_MSG_REQUEST:
776                 return DLM_MSG_REQUEST_REPLY;
777         case DLM_MSG_CONVERT:
778                 return DLM_MSG_CONVERT_REPLY;
779         case DLM_MSG_UNLOCK:
780                 return DLM_MSG_UNLOCK_REPLY;
781         case DLM_MSG_CANCEL:
782                 return DLM_MSG_CANCEL_REPLY;
783         case DLM_MSG_LOOKUP:
784                 return DLM_MSG_LOOKUP_REPLY;
785         }
786         return -1;
787 }
788
789 /* add/remove lkb from global waiters list of lkb's waiting for
790    a reply from a remote node */
791
792 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
793 {
794         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
795         int error = 0;
796
797         mutex_lock(&ls->ls_waiters_mutex);
798
799         if (is_overlap_unlock(lkb) ||
800             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
801                 error = -EINVAL;
802                 goto out;
803         }
804
805         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
806                 switch (mstype) {
807                 case DLM_MSG_UNLOCK:
808                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
809                         break;
810                 case DLM_MSG_CANCEL:
811                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
812                         break;
813                 default:
814                         error = -EBUSY;
815                         goto out;
816                 }
817                 lkb->lkb_wait_count++;
818                 hold_lkb(lkb);
819
820                 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
821                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
822                           lkb->lkb_wait_count, lkb->lkb_flags);
823                 goto out;
824         }
825
826         DLM_ASSERT(!lkb->lkb_wait_count,
827                    dlm_print_lkb(lkb);
828                    printk("wait_count %d\n", lkb->lkb_wait_count););
829
830         lkb->lkb_wait_count++;
831         lkb->lkb_wait_type = mstype;
832         hold_lkb(lkb);
833         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
834  out:
835         if (error)
836                 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
837                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
838                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
839         mutex_unlock(&ls->ls_waiters_mutex);
840         return error;
841 }
842
843 /* We clear the RESEND flag because we might be taking an lkb off the waiters
844    list as part of process_requestqueue (e.g. a lookup that has an optimized
845    request reply on the requestqueue) between dlm_recover_waiters_pre() which
846    set RESEND and dlm_recover_waiters_post() */
847
848 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
849 {
850         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
851         int overlap_done = 0;
852
853         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
854                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
855                 overlap_done = 1;
856                 goto out_del;
857         }
858
859         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
861                 overlap_done = 1;
862                 goto out_del;
863         }
864
865         /* N.B. type of reply may not always correspond to type of original
866            msg due to lookup->request optimization, verify others? */
867
868         if (lkb->lkb_wait_type) {
869                 lkb->lkb_wait_type = 0;
870                 goto out_del;
871         }
872
873         log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
874                   lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
875         return -1;
876
877  out_del:
878         /* the force-unlock/cancel has completed and we haven't recvd a reply
879            to the op that was in progress prior to the unlock/cancel; we
880            give up on any reply to the earlier op.  FIXME: not sure when/how
881            this would happen */
882
883         if (overlap_done && lkb->lkb_wait_type) {
884                 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
885                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
886                 lkb->lkb_wait_count--;
887                 lkb->lkb_wait_type = 0;
888         }
889
890         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
891
892         lkb->lkb_flags &= ~DLM_IFL_RESEND;
893         lkb->lkb_wait_count--;
894         if (!lkb->lkb_wait_count)
895                 list_del_init(&lkb->lkb_wait_reply);
896         unhold_lkb(lkb);
897         return 0;
898 }
899
900 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
901 {
902         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
903         int error;
904
905         mutex_lock(&ls->ls_waiters_mutex);
906         error = _remove_from_waiters(lkb, mstype);
907         mutex_unlock(&ls->ls_waiters_mutex);
908         return error;
909 }
910
911 /* Handles situations where we might be processing a "fake" or "stub" reply in
912    which we can't try to take waiters_mutex again. */
913
914 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
915 {
916         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
917         int error;
918
919         if (ms != &ls->ls_stub_ms)
920                 mutex_lock(&ls->ls_waiters_mutex);
921         error = _remove_from_waiters(lkb, ms->m_type);
922         if (ms != &ls->ls_stub_ms)
923                 mutex_unlock(&ls->ls_waiters_mutex);
924         return error;
925 }
926
927 static void dir_remove(struct dlm_rsb *r)
928 {
929         int to_nodeid;
930
931         if (dlm_no_directory(r->res_ls))
932                 return;
933
934         to_nodeid = dlm_dir_nodeid(r);
935         if (to_nodeid != dlm_our_nodeid())
936                 send_remove(r);
937         else
938                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
939                                      r->res_name, r->res_length);
940 }
941
942 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
943    found since they are in order of newest to oldest? */
944
945 static int shrink_bucket(struct dlm_ls *ls, int b)
946 {
947         struct dlm_rsb *r;
948         int count = 0, found;
949
950         for (;;) {
951                 found = 0;
952                 write_lock(&ls->ls_rsbtbl[b].lock);
953                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
954                                             res_hashchain) {
955                         if (!time_after_eq(jiffies, r->res_toss_time +
956                                            dlm_config.ci_toss_secs * HZ))
957                                 continue;
958                         found = 1;
959                         break;
960                 }
961
962                 if (!found) {
963                         write_unlock(&ls->ls_rsbtbl[b].lock);
964                         break;
965                 }
966
967                 if (kref_put(&r->res_ref, kill_rsb)) {
968                         list_del(&r->res_hashchain);
969                         write_unlock(&ls->ls_rsbtbl[b].lock);
970
971                         if (is_master(r))
972                                 dir_remove(r);
973                         free_rsb(r);
974                         count++;
975                 } else {
976                         write_unlock(&ls->ls_rsbtbl[b].lock);
977                         log_error(ls, "tossed rsb in use %s", r->res_name);
978                 }
979         }
980
981         return count;
982 }
983
984 void dlm_scan_rsbs(struct dlm_ls *ls)
985 {
986         int i;
987
988         if (dlm_locking_stopped(ls))
989                 return;
990
991         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
992                 shrink_bucket(ls, i);
993                 cond_resched();
994         }
995 }
996
997 /* lkb is master or local copy */
998
999 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1000 {
1001         int b, len = r->res_ls->ls_lvblen;
1002
1003         /* b=1 lvb returned to caller
1004            b=0 lvb written to rsb or invalidated
1005            b=-1 do nothing */
1006
1007         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1008
1009         if (b == 1) {
1010                 if (!lkb->lkb_lvbptr)
1011                         return;
1012
1013                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1014                         return;
1015
1016                 if (!r->res_lvbptr)
1017                         return;
1018
1019                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1020                 lkb->lkb_lvbseq = r->res_lvbseq;
1021
1022         } else if (b == 0) {
1023                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1024                         rsb_set_flag(r, RSB_VALNOTVALID);
1025                         return;
1026                 }
1027
1028                 if (!lkb->lkb_lvbptr)
1029                         return;
1030
1031                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1032                         return;
1033
1034                 if (!r->res_lvbptr)
1035                         r->res_lvbptr = allocate_lvb(r->res_ls);
1036
1037                 if (!r->res_lvbptr)
1038                         return;
1039
1040                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1041                 r->res_lvbseq++;
1042                 lkb->lkb_lvbseq = r->res_lvbseq;
1043                 rsb_clear_flag(r, RSB_VALNOTVALID);
1044         }
1045
1046         if (rsb_flag(r, RSB_VALNOTVALID))
1047                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1048 }
1049
1050 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1051 {
1052         if (lkb->lkb_grmode < DLM_LOCK_PW)
1053                 return;
1054
1055         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1056                 rsb_set_flag(r, RSB_VALNOTVALID);
1057                 return;
1058         }
1059
1060         if (!lkb->lkb_lvbptr)
1061                 return;
1062
1063         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1064                 return;
1065
1066         if (!r->res_lvbptr)
1067                 r->res_lvbptr = allocate_lvb(r->res_ls);
1068
1069         if (!r->res_lvbptr)
1070                 return;
1071
1072         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1073         r->res_lvbseq++;
1074         rsb_clear_flag(r, RSB_VALNOTVALID);
1075 }
1076
1077 /* lkb is process copy (pc) */
1078
1079 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1080                             struct dlm_message *ms)
1081 {
1082         int b;
1083
1084         if (!lkb->lkb_lvbptr)
1085                 return;
1086
1087         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1088                 return;
1089
1090         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1091         if (b == 1) {
1092                 int len = receive_extralen(ms);
1093                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1094                 lkb->lkb_lvbseq = ms->m_lvbseq;
1095         }
1096 }
1097
1098 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1099    remove_lock -- used for unlock, removes lkb from granted
1100    revert_lock -- used for cancel, moves lkb from convert to granted
1101    grant_lock  -- used for request and convert, adds lkb to granted or
1102                   moves lkb from convert or waiting to granted
1103
1104    Each of these is used for master or local copy lkb's.  There is
1105    also a _pc() variation used to make the corresponding change on
1106    a process copy (pc) lkb. */
1107
1108 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1109 {
1110         del_lkb(r, lkb);
1111         lkb->lkb_grmode = DLM_LOCK_IV;
1112         /* this unhold undoes the original ref from create_lkb()
1113            so this leads to the lkb being freed */
1114         unhold_lkb(lkb);
1115 }
1116
1117 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1118 {
1119         set_lvb_unlock(r, lkb);
1120         _remove_lock(r, lkb);
1121 }
1122
1123 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1124 {
1125         _remove_lock(r, lkb);
1126 }
1127
1128 /* returns: 0 did nothing
1129             1 moved lock to granted
1130            -1 removed lock */
1131
1132 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1133 {
1134         int rv = 0;
1135
1136         lkb->lkb_rqmode = DLM_LOCK_IV;
1137
1138         switch (lkb->lkb_status) {
1139         case DLM_LKSTS_GRANTED:
1140                 break;
1141         case DLM_LKSTS_CONVERT:
1142                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1143                 rv = 1;
1144                 break;
1145         case DLM_LKSTS_WAITING:
1146                 del_lkb(r, lkb);
1147                 lkb->lkb_grmode = DLM_LOCK_IV;
1148                 /* this unhold undoes the original ref from create_lkb()
1149                    so this leads to the lkb being freed */
1150                 unhold_lkb(lkb);
1151                 rv = -1;
1152                 break;
1153         default:
1154                 log_print("invalid status for revert %d", lkb->lkb_status);
1155         }
1156         return rv;
1157 }
1158
1159 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1160 {
1161         return revert_lock(r, lkb);
1162 }
1163
1164 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1165 {
1166         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1167                 lkb->lkb_grmode = lkb->lkb_rqmode;
1168                 if (lkb->lkb_status)
1169                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1170                 else
1171                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1172         }
1173
1174         lkb->lkb_rqmode = DLM_LOCK_IV;
1175 }
1176
1177 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1178 {
1179         set_lvb_lock(r, lkb);
1180         _grant_lock(r, lkb);
1181         lkb->lkb_highbast = 0;
1182 }
1183
1184 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1185                           struct dlm_message *ms)
1186 {
1187         set_lvb_lock_pc(r, lkb, ms);
1188         _grant_lock(r, lkb);
1189 }
1190
1191 /* called by grant_pending_locks() which means an async grant message must
1192    be sent to the requesting node in addition to granting the lock if the
1193    lkb belongs to a remote node. */
1194
1195 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1196 {
1197         grant_lock(r, lkb);
1198         if (is_master_copy(lkb))
1199                 send_grant(r, lkb);
1200         else
1201                 queue_cast(r, lkb, 0);
1202 }
1203
1204 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1205    change the granted/requested modes.  We're munging things accordingly in
1206    the process copy.
1207    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1208    conversion deadlock
1209    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1210    compatible with other granted locks */
1211
1212 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1213 {
1214         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1215                 log_print("munge_demoted %x invalid reply type %d",
1216                           lkb->lkb_id, ms->m_type);
1217                 return;
1218         }
1219
1220         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1221                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1222                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1223                 return;
1224         }
1225
1226         lkb->lkb_grmode = DLM_LOCK_NL;
1227 }
1228
1229 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1230 {
1231         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1232             ms->m_type != DLM_MSG_GRANT) {
1233                 log_print("munge_altmode %x invalid reply type %d",
1234                           lkb->lkb_id, ms->m_type);
1235                 return;
1236         }
1237
1238         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1239                 lkb->lkb_rqmode = DLM_LOCK_PR;
1240         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1241                 lkb->lkb_rqmode = DLM_LOCK_CW;
1242         else {
1243                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1244                 dlm_print_lkb(lkb);
1245         }
1246 }
1247
1248 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1249 {
1250         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1251                                            lkb_statequeue);
1252         if (lkb->lkb_id == first->lkb_id)
1253                 return 1;
1254
1255         return 0;
1256 }
1257
1258 /* Check if the given lkb conflicts with another lkb on the queue. */
1259
1260 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1261 {
1262         struct dlm_lkb *this;
1263
1264         list_for_each_entry(this, head, lkb_statequeue) {
1265                 if (this == lkb)
1266                         continue;
1267                 if (!modes_compat(this, lkb))
1268                         return 1;
1269         }
1270         return 0;
1271 }
1272
1273 /*
1274  * "A conversion deadlock arises with a pair of lock requests in the converting
1275  * queue for one resource.  The granted mode of each lock blocks the requested
1276  * mode of the other lock."
1277  *
1278  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1279  * convert queue from being granted, then demote lkb (set grmode to NL).
1280  * This second form requires that we check for conv-deadlk even when
1281  * now == 0 in _can_be_granted().
1282  *
1283  * Example:
1284  * Granted Queue: empty
1285  * Convert Queue: NL->EX (first lock)
1286  *                PR->EX (second lock)
1287  *
1288  * The first lock can't be granted because of the granted mode of the second
1289  * lock and the second lock can't be granted because it's not first in the
1290  * list.  We demote the granted mode of the second lock (the lkb passed to this
1291  * function).
1292  *
1293  * After the resolution, the "grant pending" function needs to go back and try
1294  * to grant locks on the convert queue again since the first lock can now be
1295  * granted.
1296  */
1297
1298 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1299 {
1300         struct dlm_lkb *this, *first = NULL, *self = NULL;
1301
1302         list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1303                 if (!first)
1304                         first = this;
1305                 if (this == lkb) {
1306                         self = lkb;
1307                         continue;
1308                 }
1309
1310                 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1311                         return 1;
1312         }
1313
1314         /* if lkb is on the convert queue and is preventing the first
1315            from being granted, then there's deadlock and we demote lkb.
1316            multiple converting locks may need to do this before the first
1317            converting lock can be granted. */
1318
1319         if (self && self != first) {
1320                 if (!modes_compat(lkb, first) &&
1321                     !queue_conflict(&rsb->res_grantqueue, first))
1322                         return 1;
1323         }
1324
1325         return 0;
1326 }
1327
1328 /*
1329  * Return 1 if the lock can be granted, 0 otherwise.
1330  * Also detect and resolve conversion deadlocks.
1331  *
1332  * lkb is the lock to be granted
1333  *
1334  * now is 1 if the function is being called in the context of the
1335  * immediate request, it is 0 if called later, after the lock has been
1336  * queued.
1337  *
1338  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1339  */
1340
1341 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1342 {
1343         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1344
1345         /*
1346          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1347          * a new request for a NL mode lock being blocked.
1348          *
1349          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1350          * request, then it would be granted.  In essence, the use of this flag
1351          * tells the Lock Manager to expedite theis request by not considering
1352          * what may be in the CONVERTING or WAITING queues...  As of this
1353          * writing, the EXPEDITE flag can be used only with new requests for NL
1354          * mode locks.  This flag is not valid for conversion requests.
1355          *
1356          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1357          * conversion or used with a non-NL requested mode.  We also know an
1358          * EXPEDITE request is always granted immediately, so now must always
1359          * be 1.  The full condition to grant an expedite request: (now &&
1360          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1361          * therefore be shortened to just checking the flag.
1362          */
1363
1364         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1365                 return 1;
1366
1367         /*
1368          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1369          * added to the remaining conditions.
1370          */
1371
1372         if (queue_conflict(&r->res_grantqueue, lkb))
1373                 goto out;
1374
1375         /*
1376          * 6-3: By default, a conversion request is immediately granted if the
1377          * requested mode is compatible with the modes of all other granted
1378          * locks
1379          */
1380
1381         if (queue_conflict(&r->res_convertqueue, lkb))
1382                 goto out;
1383
1384         /*
1385          * 6-5: But the default algorithm for deciding whether to grant or
1386          * queue conversion requests does not by itself guarantee that such
1387          * requests are serviced on a "first come first serve" basis.  This, in
1388          * turn, can lead to a phenomenon known as "indefinate postponement".
1389          *
1390          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1391          * the system service employed to request a lock conversion.  This flag
1392          * forces certain conversion requests to be queued, even if they are
1393          * compatible with the granted modes of other locks on the same
1394          * resource.  Thus, the use of this flag results in conversion requests
1395          * being ordered on a "first come first servce" basis.
1396          *
1397          * DCT: This condition is all about new conversions being able to occur
1398          * "in place" while the lock remains on the granted queue (assuming
1399          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1400          * doesn't _have_ to go onto the convert queue where it's processed in
1401          * order.  The "now" variable is necessary to distinguish converts
1402          * being received and processed for the first time now, because once a
1403          * convert is moved to the conversion queue the condition below applies
1404          * requiring fifo granting.
1405          */
1406
1407         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1408                 return 1;
1409
1410         /*
1411          * The NOORDER flag is set to avoid the standard vms rules on grant
1412          * order.
1413          */
1414
1415         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1416                 return 1;
1417
1418         /*
1419          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1420          * granted until all other conversion requests ahead of it are granted
1421          * and/or canceled.
1422          */
1423
1424         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1425                 return 1;
1426
1427         /*
1428          * 6-4: By default, a new request is immediately granted only if all
1429          * three of the following conditions are satisfied when the request is
1430          * issued:
1431          * - The queue of ungranted conversion requests for the resource is
1432          *   empty.
1433          * - The queue of ungranted new requests for the resource is empty.
1434          * - The mode of the new request is compatible with the most
1435          *   restrictive mode of all granted locks on the resource.
1436          */
1437
1438         if (now && !conv && list_empty(&r->res_convertqueue) &&
1439             list_empty(&r->res_waitqueue))
1440                 return 1;
1441
1442         /*
1443          * 6-4: Once a lock request is in the queue of ungranted new requests,
1444          * it cannot be granted until the queue of ungranted conversion
1445          * requests is empty, all ungranted new requests ahead of it are
1446          * granted and/or canceled, and it is compatible with the granted mode
1447          * of the most restrictive lock granted on the resource.
1448          */
1449
1450         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1451             first_in_list(lkb, &r->res_waitqueue))
1452                 return 1;
1453
1454  out:
1455         /*
1456          * The following, enabled by CONVDEADLK, departs from VMS.
1457          */
1458
1459         if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1460             conversion_deadlock_detect(r, lkb)) {
1461                 lkb->lkb_grmode = DLM_LOCK_NL;
1462                 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1463         }
1464
1465         return 0;
1466 }
1467
1468 /*
1469  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1470  * simple way to provide a big optimization to applications that can use them.
1471  */
1472
1473 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1474 {
1475         uint32_t flags = lkb->lkb_exflags;
1476         int rv;
1477         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1478
1479         rv = _can_be_granted(r, lkb, now);
1480         if (rv)
1481                 goto out;
1482
1483         if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1484                 goto out;
1485
1486         if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1487                 alt = DLM_LOCK_PR;
1488         else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1489                 alt = DLM_LOCK_CW;
1490
1491         if (alt) {
1492                 lkb->lkb_rqmode = alt;
1493                 rv = _can_be_granted(r, lkb, now);
1494                 if (rv)
1495                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1496                 else
1497                         lkb->lkb_rqmode = rqmode;
1498         }
1499  out:
1500         return rv;
1501 }
1502
1503 static int grant_pending_convert(struct dlm_rsb *r, int high)
1504 {
1505         struct dlm_lkb *lkb, *s;
1506         int hi, demoted, quit, grant_restart, demote_restart;
1507
1508         quit = 0;
1509  restart:
1510         grant_restart = 0;
1511         demote_restart = 0;
1512         hi = DLM_LOCK_IV;
1513
1514         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1515                 demoted = is_demoted(lkb);
1516                 if (can_be_granted(r, lkb, 0)) {
1517                         grant_lock_pending(r, lkb);
1518                         grant_restart = 1;
1519                 } else {
1520                         hi = max_t(int, lkb->lkb_rqmode, hi);
1521                         if (!demoted && is_demoted(lkb))
1522                                 demote_restart = 1;
1523                 }
1524         }
1525
1526         if (grant_restart)
1527                 goto restart;
1528         if (demote_restart && !quit) {
1529                 quit = 1;
1530                 goto restart;
1531         }
1532
1533         return max_t(int, high, hi);
1534 }
1535
1536 static int grant_pending_wait(struct dlm_rsb *r, int high)
1537 {
1538         struct dlm_lkb *lkb, *s;
1539
1540         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1541                 if (can_be_granted(r, lkb, 0))
1542                         grant_lock_pending(r, lkb);
1543                 else
1544                         high = max_t(int, lkb->lkb_rqmode, high);
1545         }
1546
1547         return high;
1548 }
1549
1550 static void grant_pending_locks(struct dlm_rsb *r)
1551 {
1552         struct dlm_lkb *lkb, *s;
1553         int high = DLM_LOCK_IV;
1554
1555         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1556
1557         high = grant_pending_convert(r, high);
1558         high = grant_pending_wait(r, high);
1559
1560         if (high == DLM_LOCK_IV)
1561                 return;
1562
1563         /*
1564          * If there are locks left on the wait/convert queue then send blocking
1565          * ASTs to granted locks based on the largest requested mode (high)
1566          * found above. FIXME: highbast < high comparison not valid for PR/CW.
1567          */
1568
1569         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1570                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1571                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1572                         queue_bast(r, lkb, high);
1573                         lkb->lkb_highbast = high;
1574                 }
1575         }
1576 }
1577
1578 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1579                             struct dlm_lkb *lkb)
1580 {
1581         struct dlm_lkb *gr;
1582
1583         list_for_each_entry(gr, head, lkb_statequeue) {
1584                 if (gr->lkb_bastaddr &&
1585                     gr->lkb_highbast < lkb->lkb_rqmode &&
1586                     !modes_compat(gr, lkb)) {
1587                         queue_bast(r, gr, lkb->lkb_rqmode);
1588                         gr->lkb_highbast = lkb->lkb_rqmode;
1589                 }
1590         }
1591 }
1592
1593 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1594 {
1595         send_bast_queue(r, &r->res_grantqueue, lkb);
1596 }
1597
1598 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1599 {
1600         send_bast_queue(r, &r->res_grantqueue, lkb);
1601         send_bast_queue(r, &r->res_convertqueue, lkb);
1602 }
1603
1604 /* set_master(r, lkb) -- set the master nodeid of a resource
1605
1606    The purpose of this function is to set the nodeid field in the given
1607    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1608    known, it can just be copied to the lkb and the function will return
1609    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1610    before it can be copied to the lkb.
1611
1612    When the rsb nodeid is being looked up remotely, the initial lkb
1613    causing the lookup is kept on the ls_waiters list waiting for the
1614    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1615    on the rsb's res_lookup list until the master is verified.
1616
1617    Return values:
1618    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1619    1: the rsb master is not available and the lkb has been placed on
1620       a wait queue
1621 */
1622
1623 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1624 {
1625         struct dlm_ls *ls = r->res_ls;
1626         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1627
1628         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1629                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1630                 r->res_first_lkid = lkb->lkb_id;
1631                 lkb->lkb_nodeid = r->res_nodeid;
1632                 return 0;
1633         }
1634
1635         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1636                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1637                 return 1;
1638         }
1639
1640         if (r->res_nodeid == 0) {
1641                 lkb->lkb_nodeid = 0;
1642                 return 0;
1643         }
1644
1645         if (r->res_nodeid > 0) {
1646                 lkb->lkb_nodeid = r->res_nodeid;
1647                 return 0;
1648         }
1649
1650         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1651
1652         dir_nodeid = dlm_dir_nodeid(r);
1653
1654         if (dir_nodeid != our_nodeid) {
1655                 r->res_first_lkid = lkb->lkb_id;
1656                 send_lookup(r, lkb);
1657                 return 1;
1658         }
1659
1660         for (;;) {
1661                 /* It's possible for dlm_scand to remove an old rsb for
1662                    this same resource from the toss list, us to create
1663                    a new one, look up the master locally, and find it
1664                    already exists just before dlm_scand does the
1665                    dir_remove() on the previous rsb. */
1666
1667                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1668                                        r->res_length, &ret_nodeid);
1669                 if (!error)
1670                         break;
1671                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1672                 schedule();
1673         }
1674
1675         if (ret_nodeid == our_nodeid) {
1676                 r->res_first_lkid = 0;
1677                 r->res_nodeid = 0;
1678                 lkb->lkb_nodeid = 0;
1679         } else {
1680                 r->res_first_lkid = lkb->lkb_id;
1681                 r->res_nodeid = ret_nodeid;
1682                 lkb->lkb_nodeid = ret_nodeid;
1683         }
1684         return 0;
1685 }
1686
1687 static void process_lookup_list(struct dlm_rsb *r)
1688 {
1689         struct dlm_lkb *lkb, *safe;
1690
1691         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1692                 list_del_init(&lkb->lkb_rsb_lookup);
1693                 _request_lock(r, lkb);
1694                 schedule();
1695         }
1696 }
1697
1698 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1699
1700 static void confirm_master(struct dlm_rsb *r, int error)
1701 {
1702         struct dlm_lkb *lkb;
1703
1704         if (!r->res_first_lkid)
1705                 return;
1706
1707         switch (error) {
1708         case 0:
1709         case -EINPROGRESS:
1710                 r->res_first_lkid = 0;
1711                 process_lookup_list(r);
1712                 break;
1713
1714         case -EAGAIN:
1715                 /* the remote master didn't queue our NOQUEUE request;
1716                    make a waiting lkb the first_lkid */
1717
1718                 r->res_first_lkid = 0;
1719
1720                 if (!list_empty(&r->res_lookup)) {
1721                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1722                                          lkb_rsb_lookup);
1723                         list_del_init(&lkb->lkb_rsb_lookup);
1724                         r->res_first_lkid = lkb->lkb_id;
1725                         _request_lock(r, lkb);
1726                 } else
1727                         r->res_nodeid = -1;
1728                 break;
1729
1730         default:
1731                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1732         }
1733 }
1734
1735 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1736                          int namelen, uint32_t parent_lkid, void *ast,
1737                          void *astarg, void *bast, struct dlm_args *args)
1738 {
1739         int rv = -EINVAL;
1740
1741         /* check for invalid arg usage */
1742
1743         if (mode < 0 || mode > DLM_LOCK_EX)
1744                 goto out;
1745
1746         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1747                 goto out;
1748
1749         if (flags & DLM_LKF_CANCEL)
1750                 goto out;
1751
1752         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1753                 goto out;
1754
1755         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1756                 goto out;
1757
1758         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1759                 goto out;
1760
1761         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1762                 goto out;
1763
1764         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1765                 goto out;
1766
1767         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1768                 goto out;
1769
1770         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1771                 goto out;
1772
1773         if (!ast || !lksb)
1774                 goto out;
1775
1776         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1777                 goto out;
1778
1779         /* parent/child locks not yet supported */
1780         if (parent_lkid)
1781                 goto out;
1782
1783         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1784                 goto out;
1785
1786         /* these args will be copied to the lkb in validate_lock_args,
1787            it cannot be done now because when converting locks, fields in
1788            an active lkb cannot be modified before locking the rsb */
1789
1790         args->flags = flags;
1791         args->astaddr = ast;
1792         args->astparam = (long) astarg;
1793         args->bastaddr = bast;
1794         args->mode = mode;
1795         args->lksb = lksb;
1796         rv = 0;
1797  out:
1798         return rv;
1799 }
1800
1801 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1802 {
1803         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1804                       DLM_LKF_FORCEUNLOCK))
1805                 return -EINVAL;
1806
1807         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1808                 return -EINVAL;
1809
1810         args->flags = flags;
1811         args->astparam = (long) astarg;
1812         return 0;
1813 }
1814
1815 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1816                               struct dlm_args *args)
1817 {
1818         int rv = -EINVAL;
1819
1820         if (args->flags & DLM_LKF_CONVERT) {
1821                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1822                         goto out;
1823
1824                 if (args->flags & DLM_LKF_QUECVT &&
1825                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1826                         goto out;
1827
1828                 rv = -EBUSY;
1829                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1830                         goto out;
1831
1832                 if (lkb->lkb_wait_type)
1833                         goto out;
1834
1835                 if (is_overlap(lkb))
1836                         goto out;
1837         }
1838
1839         lkb->lkb_exflags = args->flags;
1840         lkb->lkb_sbflags = 0;
1841         lkb->lkb_astaddr = args->astaddr;
1842         lkb->lkb_astparam = args->astparam;
1843         lkb->lkb_bastaddr = args->bastaddr;
1844         lkb->lkb_rqmode = args->mode;
1845         lkb->lkb_lksb = args->lksb;
1846         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1847         lkb->lkb_ownpid = (int) current->pid;
1848         rv = 0;
1849  out:
1850         return rv;
1851 }
1852
1853 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1854    for success */
1855
1856 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1857    because there may be a lookup in progress and it's valid to do
1858    cancel/unlockf on it */
1859
1860 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1861 {
1862         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1863         int rv = -EINVAL;
1864
1865         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1866                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1867                 dlm_print_lkb(lkb);
1868                 goto out;
1869         }
1870
1871         /* an lkb may still exist even though the lock is EOL'ed due to a
1872            cancel, unlock or failed noqueue request; an app can't use these
1873            locks; return same error as if the lkid had not been found at all */
1874
1875         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1876                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1877                 rv = -ENOENT;
1878                 goto out;
1879         }
1880
1881         /* an lkb may be waiting for an rsb lookup to complete where the
1882            lookup was initiated by another lock */
1883
1884         if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1885                 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1886                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1887                         list_del_init(&lkb->lkb_rsb_lookup);
1888                         queue_cast(lkb->lkb_resource, lkb,
1889                                    args->flags & DLM_LKF_CANCEL ?
1890                                    -DLM_ECANCEL : -DLM_EUNLOCK);
1891                         unhold_lkb(lkb); /* undoes create_lkb() */
1892                         rv = -EBUSY;
1893                         goto out;
1894                 }
1895         }
1896
1897         /* cancel not allowed with another cancel/unlock in progress */
1898
1899         if (args->flags & DLM_LKF_CANCEL) {
1900                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1901                         goto out;
1902
1903                 if (is_overlap(lkb))
1904                         goto out;
1905
1906                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1907                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1908                         rv = -EBUSY;
1909                         goto out;
1910                 }
1911
1912                 switch (lkb->lkb_wait_type) {
1913                 case DLM_MSG_LOOKUP:
1914                 case DLM_MSG_REQUEST:
1915                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1916                         rv = -EBUSY;
1917                         goto out;
1918                 case DLM_MSG_UNLOCK:
1919                 case DLM_MSG_CANCEL:
1920                         goto out;
1921                 }
1922                 /* add_to_waiters() will set OVERLAP_CANCEL */
1923                 goto out_ok;
1924         }
1925
1926         /* do we need to allow a force-unlock if there's a normal unlock
1927            already in progress?  in what conditions could the normal unlock
1928            fail such that we'd want to send a force-unlock to be sure? */
1929
1930         if (args->flags & DLM_LKF_FORCEUNLOCK) {
1931                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1932                         goto out;
1933
1934                 if (is_overlap_unlock(lkb))
1935                         goto out;
1936
1937                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1938                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1939                         rv = -EBUSY;
1940                         goto out;
1941                 }
1942
1943                 switch (lkb->lkb_wait_type) {
1944                 case DLM_MSG_LOOKUP:
1945                 case DLM_MSG_REQUEST:
1946                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1947                         rv = -EBUSY;
1948                         goto out;
1949                 case DLM_MSG_UNLOCK:
1950                         goto out;
1951                 }
1952                 /* add_to_waiters() will set OVERLAP_UNLOCK */
1953                 goto out_ok;
1954         }
1955
1956         /* normal unlock not allowed if there's any op in progress */
1957         rv = -EBUSY;
1958         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1959                 goto out;
1960
1961  out_ok:
1962         /* an overlapping op shouldn't blow away exflags from other op */
1963         lkb->lkb_exflags |= args->flags;
1964         lkb->lkb_sbflags = 0;
1965         lkb->lkb_astparam = args->astparam;
1966         rv = 0;
1967  out:
1968         if (rv)
1969                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1970                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1971                           args->flags, lkb->lkb_wait_type,
1972                           lkb->lkb_resource->res_name);
1973         return rv;
1974 }
1975
1976 /*
1977  * Four stage 4 varieties:
1978  * do_request(), do_convert(), do_unlock(), do_cancel()
1979  * These are called on the master node for the given lock and
1980  * from the central locking logic.
1981  */
1982
1983 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1984 {
1985         int error = 0;
1986
1987         if (can_be_granted(r, lkb, 1)) {
1988                 grant_lock(r, lkb);
1989                 queue_cast(r, lkb, 0);
1990                 goto out;
1991         }
1992
1993         if (can_be_queued(lkb)) {
1994                 error = -EINPROGRESS;
1995                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1996                 send_blocking_asts(r, lkb);
1997                 goto out;
1998         }
1999
2000         error = -EAGAIN;
2001         if (force_blocking_asts(lkb))
2002                 send_blocking_asts_all(r, lkb);
2003         queue_cast(r, lkb, -EAGAIN);
2004
2005  out:
2006         return error;
2007 }
2008
2009 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2010 {
2011         int error = 0;
2012
2013         /* changing an existing lock may allow others to be granted */
2014
2015         if (can_be_granted(r, lkb, 1)) {
2016                 grant_lock(r, lkb);
2017                 queue_cast(r, lkb, 0);
2018                 grant_pending_locks(r);
2019                 goto out;
2020         }
2021
2022         /* is_demoted() means the can_be_granted() above set the grmode
2023            to NL, and left us on the granted queue.  This auto-demotion
2024            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2025            now grantable.  We have to try to grant other converting locks
2026            before we try again to grant this one. */
2027
2028         if (is_demoted(lkb)) {
2029                 grant_pending_convert(r, DLM_LOCK_IV);
2030                 if (_can_be_granted(r, lkb, 1)) {
2031                         grant_lock(r, lkb);
2032                         queue_cast(r, lkb, 0);
2033                         grant_pending_locks(r);
2034                         goto out;
2035                 }
2036                 /* else fall through and move to convert queue */
2037         }
2038
2039         if (can_be_queued(lkb)) {
2040                 error = -EINPROGRESS;
2041                 del_lkb(r, lkb);
2042                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2043                 send_blocking_asts(r, lkb);
2044                 goto out;
2045         }
2046
2047         error = -EAGAIN;
2048         if (force_blocking_asts(lkb))
2049                 send_blocking_asts_all(r, lkb);
2050         queue_cast(r, lkb, -EAGAIN);
2051
2052  out:
2053         return error;
2054 }
2055
2056 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2057 {
2058         remove_lock(r, lkb);
2059         queue_cast(r, lkb, -DLM_EUNLOCK);
2060         grant_pending_locks(r);
2061         return -DLM_EUNLOCK;
2062 }
2063
2064 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2065  
2066 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2067 {
2068         int error;
2069
2070         error = revert_lock(r, lkb);
2071         if (error) {
2072                 queue_cast(r, lkb, -DLM_ECANCEL);
2073                 grant_pending_locks(r);
2074                 return -DLM_ECANCEL;
2075         }
2076         return 0;
2077 }
2078
2079 /*
2080  * Four stage 3 varieties:
2081  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2082  */
2083
2084 /* add a new lkb to a possibly new rsb, called by requesting process */
2085
2086 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2087 {
2088         int error;
2089
2090         /* set_master: sets lkb nodeid from r */
2091
2092         error = set_master(r, lkb);
2093         if (error < 0)
2094                 goto out;
2095         if (error) {
2096                 error = 0;
2097                 goto out;
2098         }
2099
2100         if (is_remote(r))
2101                 /* receive_request() calls do_request() on remote node */
2102                 error = send_request(r, lkb);
2103         else
2104                 error = do_request(r, lkb);
2105  out:
2106         return error;
2107 }
2108
2109 /* change some property of an existing lkb, e.g. mode */
2110
2111 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2112 {
2113         int error;
2114
2115         if (is_remote(r))
2116                 /* receive_convert() calls do_convert() on remote node */
2117                 error = send_convert(r, lkb);
2118         else
2119                 error = do_convert(r, lkb);
2120
2121         return error;
2122 }
2123
2124 /* remove an existing lkb from the granted queue */
2125
2126 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2127 {
2128         int error;
2129
2130         if (is_remote(r))
2131                 /* receive_unlock() calls do_unlock() on remote node */
2132                 error = send_unlock(r, lkb);
2133         else
2134                 error = do_unlock(r, lkb);
2135
2136         return error;
2137 }
2138
2139 /* remove an existing lkb from the convert or wait queue */
2140
2141 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2142 {
2143         int error;
2144
2145         if (is_remote(r))
2146                 /* receive_cancel() calls do_cancel() on remote node */
2147                 error = send_cancel(r, lkb);
2148         else
2149                 error = do_cancel(r, lkb);
2150
2151         return error;
2152 }
2153
2154 /*
2155  * Four stage 2 varieties:
2156  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2157  */
2158
2159 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2160                         int len, struct dlm_args *args)
2161 {
2162         struct dlm_rsb *r;
2163         int error;
2164
2165         error = validate_lock_args(ls, lkb, args);
2166         if (error)
2167                 goto out;
2168
2169         error = find_rsb(ls, name, len, R_CREATE, &r);
2170         if (error)
2171                 goto out;
2172
2173         lock_rsb(r);
2174
2175         attach_lkb(r, lkb);
2176         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2177
2178         error = _request_lock(r, lkb);
2179
2180         unlock_rsb(r);
2181         put_rsb(r);
2182
2183  out:
2184         return error;
2185 }
2186
2187 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2188                         struct dlm_args *args)
2189 {
2190         struct dlm_rsb *r;
2191         int error;
2192
2193         r = lkb->lkb_resource;
2194
2195         hold_rsb(r);
2196         lock_rsb(r);
2197
2198         error = validate_lock_args(ls, lkb, args);
2199         if (error)
2200                 goto out;
2201
2202         error = _convert_lock(r, lkb);
2203  out:
2204         unlock_rsb(r);
2205         put_rsb(r);
2206         return error;
2207 }
2208
2209 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2210                        struct dlm_args *args)
2211 {
2212         struct dlm_rsb *r;
2213         int error;
2214
2215         r = lkb->lkb_resource;
2216
2217         hold_rsb(r);
2218         lock_rsb(r);
2219
2220         error = validate_unlock_args(lkb, args);
2221         if (error)
2222                 goto out;
2223
2224         error = _unlock_lock(r, lkb);
2225  out:
2226         unlock_rsb(r);
2227         put_rsb(r);
2228         return error;
2229 }
2230
2231 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2232                        struct dlm_args *args)
2233 {
2234         struct dlm_rsb *r;
2235         int error;
2236
2237         r = lkb->lkb_resource;
2238
2239         hold_rsb(r);
2240         lock_rsb(r);
2241
2242         error = validate_unlock_args(lkb, args);
2243         if (error)
2244                 goto out;
2245
2246         error = _cancel_lock(r, lkb);
2247  out:
2248         unlock_rsb(r);
2249         put_rsb(r);
2250         return error;
2251 }
2252
2253 /*
2254  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2255  */
2256
2257 int dlm_lock(dlm_lockspace_t *lockspace,
2258              int mode,
2259              struct dlm_lksb *lksb,
2260              uint32_t flags,
2261              void *name,
2262              unsigned int namelen,
2263              uint32_t parent_lkid,
2264              void (*ast) (void *astarg),
2265              void *astarg,
2266              void (*bast) (void *astarg, int mode))
2267 {
2268         struct dlm_ls *ls;
2269         struct dlm_lkb *lkb;
2270         struct dlm_args args;
2271         int error, convert = flags & DLM_LKF_CONVERT;
2272
2273         ls = dlm_find_lockspace_local(lockspace);
2274         if (!ls)
2275                 return -EINVAL;
2276
2277         lock_recovery(ls);
2278
2279         if (convert)
2280                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2281         else
2282                 error = create_lkb(ls, &lkb);
2283
2284         if (error)
2285                 goto out;
2286
2287         error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
2288                               astarg, bast, &args);
2289         if (error)
2290                 goto out_put;
2291
2292         if (convert)
2293                 error = convert_lock(ls, lkb, &args);
2294         else
2295                 error = request_lock(ls, lkb, name, namelen, &args);
2296
2297         if (error == -EINPROGRESS)
2298                 error = 0;
2299  out_put:
2300         if (convert || error)
2301                 __put_lkb(ls, lkb);
2302         if (error == -EAGAIN)
2303                 error = 0;
2304  out:
2305         unlock_recovery(ls);
2306         dlm_put_lockspace(ls);
2307         return error;
2308 }
2309
2310 int dlm_unlock(dlm_lockspace_t *lockspace,
2311                uint32_t lkid,
2312                uint32_t flags,
2313                struct dlm_lksb *lksb,
2314                void *astarg)
2315 {
2316         struct dlm_ls *ls;
2317         struct dlm_lkb *lkb;
2318         struct dlm_args args;
2319         int error;
2320
2321         ls = dlm_find_lockspace_local(lockspace);
2322         if (!ls)
2323                 return -EINVAL;
2324
2325         lock_recovery(ls);
2326
2327         error = find_lkb(ls, lkid, &lkb);
2328         if (error)
2329                 goto out;
2330
2331         error = set_unlock_args(flags, astarg, &args);
2332         if (error)
2333                 goto out_put;
2334
2335         if (flags & DLM_LKF_CANCEL)
2336                 error = cancel_lock(ls, lkb, &args);
2337         else
2338                 error = unlock_lock(ls, lkb, &args);
2339
2340         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2341                 error = 0;
2342         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2343                 error = 0;
2344  out_put:
2345         dlm_put_lkb(lkb);
2346  out:
2347         unlock_recovery(ls);
2348         dlm_put_lockspace(ls);
2349         return error;
2350 }
2351
2352 /*
2353  * send/receive routines for remote operations and replies
2354  *
2355  * send_args
2356  * send_common
2357  * send_request                 receive_request
2358  * send_convert                 receive_convert
2359  * send_unlock                  receive_unlock
2360  * send_cancel                  receive_cancel
2361  * send_grant                   receive_grant
2362  * send_bast                    receive_bast
2363  * send_lookup                  receive_lookup
2364  * send_remove                  receive_remove
2365  *
2366  *                              send_common_reply
2367  * receive_request_reply        send_request_reply
2368  * receive_convert_reply        send_convert_reply
2369  * receive_unlock_reply         send_unlock_reply
2370  * receive_cancel_reply         send_cancel_reply
2371  * receive_lookup_reply         send_lookup_reply
2372  */
2373
2374 static int _create_message(struct dlm_ls *ls, int mb_len,
2375                            int to_nodeid, int mstype,
2376                            struct dlm_message **ms_ret,
2377                            struct dlm_mhandle **mh_ret)
2378 {
2379         struct dlm_message *ms;
2380         struct dlm_mhandle *mh;
2381         char *mb;
2382
2383         /* get_buffer gives us a message handle (mh) that we need to
2384            pass into lowcomms_commit and a message buffer (mb) that we
2385            write our data into */
2386
2387         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2388         if (!mh)
2389                 return -ENOBUFS;
2390
2391         memset(mb, 0, mb_len);
2392
2393         ms = (struct dlm_message *) mb;
2394
2395         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2396         ms->m_header.h_lockspace = ls->ls_global_id;
2397         ms->m_header.h_nodeid = dlm_our_nodeid();
2398         ms->m_header.h_length = mb_len;
2399         ms->m_header.h_cmd = DLM_MSG;
2400
2401         ms->m_type = mstype;
2402
2403         *mh_ret = mh;
2404         *ms_ret = ms;
2405         return 0;
2406 }
2407
2408 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2409                           int to_nodeid, int mstype,
2410                           struct dlm_message **ms_ret,
2411                           struct dlm_mhandle **mh_ret)
2412 {
2413         int mb_len = sizeof(struct dlm_message);
2414
2415         switch (mstype) {
2416         case DLM_MSG_REQUEST:
2417         case DLM_MSG_LOOKUP:
2418         case DLM_MSG_REMOVE:
2419                 mb_len += r->res_length;
2420                 break;
2421         case DLM_MSG_CONVERT:
2422         case DLM_MSG_UNLOCK:
2423         case DLM_MSG_REQUEST_REPLY:
2424         case DLM_MSG_CONVERT_REPLY:
2425         case DLM_MSG_GRANT:
2426                 if (lkb && lkb->lkb_lvbptr)
2427                         mb_len += r->res_ls->ls_lvblen;
2428                 break;
2429         }
2430
2431         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2432                                ms_ret, mh_ret);
2433 }
2434
2435 /* further lowcomms enhancements or alternate implementations may make
2436    the return value from this function useful at some point */
2437
2438 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2439 {
2440         dlm_message_out(ms);
2441         dlm_lowcomms_commit_buffer(mh);
2442         return 0;
2443 }
2444
2445 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2446                       struct dlm_message *ms)
2447 {
2448         ms->m_nodeid   = lkb->lkb_nodeid;
2449         ms->m_pid      = lkb->lkb_ownpid;
2450         ms->m_lkid     = lkb->lkb_id;
2451         ms->m_remid    = lkb->lkb_remid;
2452         ms->m_exflags  = lkb->lkb_exflags;
2453         ms->m_sbflags  = lkb->lkb_sbflags;
2454         ms->m_flags    = lkb->lkb_flags;
2455         ms->m_lvbseq   = lkb->lkb_lvbseq;
2456         ms->m_status   = lkb->lkb_status;
2457         ms->m_grmode   = lkb->lkb_grmode;
2458         ms->m_rqmode   = lkb->lkb_rqmode;
2459         ms->m_hash     = r->res_hash;
2460
2461         /* m_result and m_bastmode are set from function args,
2462            not from lkb fields */
2463
2464         if (lkb->lkb_bastaddr)
2465                 ms->m_asts |= AST_BAST;
2466         if (lkb->lkb_astaddr)
2467                 ms->m_asts |= AST_COMP;
2468
2469         /* compare with switch in create_message; send_remove() doesn't
2470            use send_args() */
2471
2472         switch (ms->m_type) {
2473         case DLM_MSG_REQUEST:
2474         case DLM_MSG_LOOKUP:
2475                 memcpy(ms->m_extra, r->res_name, r->res_length);
2476                 break;
2477         case DLM_MSG_CONVERT:
2478         case DLM_MSG_UNLOCK:
2479         case DLM_MSG_REQUEST_REPLY:
2480         case DLM_MSG_CONVERT_REPLY:
2481         case DLM_MSG_GRANT:
2482                 if (!lkb->lkb_lvbptr)
2483                         break;
2484                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2485                 break;
2486         }
2487 }
2488
2489 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2490 {
2491         struct dlm_message *ms;
2492         struct dlm_mhandle *mh;
2493         int to_nodeid, error;
2494
2495         error = add_to_waiters(lkb, mstype);
2496         if (error)
2497                 return error;
2498
2499         to_nodeid = r->res_nodeid;
2500
2501         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2502         if (error)
2503                 goto fail;
2504
2505         send_args(r, lkb, ms);
2506
2507         error = send_message(mh, ms);
2508         if (error)
2509                 goto fail;
2510         return 0;
2511
2512  fail:
2513         remove_from_waiters(lkb, msg_reply_type(mstype));
2514         return error;
2515 }
2516
2517 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2518 {
2519         return send_common(r, lkb, DLM_MSG_REQUEST);
2520 }
2521
2522 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2523 {
2524         int error;
2525
2526         error = send_common(r, lkb, DLM_MSG_CONVERT);
2527
2528         /* down conversions go without a reply from the master */
2529         if (!error && down_conversion(lkb)) {
2530                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2531                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2532                 r->res_ls->ls_stub_ms.m_result = 0;
2533                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2534                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2535         }
2536
2537         return error;
2538 }
2539
2540 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2541    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2542    that the master is still correct. */
2543
2544 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2545 {
2546         return send_common(r, lkb, DLM_MSG_UNLOCK);
2547 }
2548
2549 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2550 {
2551         return send_common(r, lkb, DLM_MSG_CANCEL);
2552 }
2553
2554 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2555 {
2556         struct dlm_message *ms;
2557         struct dlm_mhandle *mh;
2558         int to_nodeid, error;
2559
2560         to_nodeid = lkb->lkb_nodeid;
2561
2562         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2563         if (error)
2564                 goto out;
2565
2566         send_args(r, lkb, ms);
2567
2568         ms->m_result = 0;
2569
2570         error = send_message(mh, ms);
2571  out:
2572         return error;
2573 }
2574
2575 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2576 {
2577         struct dlm_message *ms;
2578         struct dlm_mhandle *mh;
2579         int to_nodeid, error;
2580
2581         to_nodeid = lkb->lkb_nodeid;
2582
2583         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2584         if (error)
2585                 goto out;
2586
2587         send_args(r, lkb, ms);
2588
2589         ms->m_bastmode = mode;
2590
2591         error = send_message(mh, ms);
2592  out:
2593         return error;
2594 }
2595
2596 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2597 {
2598         struct dlm_message *ms;
2599         struct dlm_mhandle *mh;
2600         int to_nodeid, error;
2601
2602         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2603         if (error)
2604                 return error;
2605
2606         to_nodeid = dlm_dir_nodeid(r);
2607
2608         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2609         if (error)
2610                 goto fail;
2611
2612         send_args(r, lkb, ms);
2613
2614         error = send_message(mh, ms);
2615         if (error)
2616                 goto fail;
2617         return 0;
2618
2619  fail:
2620         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2621         return error;
2622 }
2623
2624 static int send_remove(struct dlm_rsb *r)
2625 {
2626         struct dlm_message *ms;
2627         struct dlm_mhandle *mh;
2628         int to_nodeid, error;
2629
2630         to_nodeid = dlm_dir_nodeid(r);
2631
2632         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2633         if (error)
2634                 goto out;
2635
2636         memcpy(ms->m_extra, r->res_name, r->res_length);
2637         ms->m_hash = r->res_hash;
2638
2639         error = send_message(mh, ms);
2640  out:
2641         return error;
2642 }
2643
2644 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2645                              int mstype, int rv)
2646 {
2647         struct dlm_message *ms;
2648         struct dlm_mhandle *mh;
2649         int to_nodeid, error;
2650
2651         to_nodeid = lkb->lkb_nodeid;
2652
2653         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2654         if (error)
2655                 goto out;
2656
2657         send_args(r, lkb, ms);
2658
2659         ms->m_result = rv;
2660
2661         error = send_message(mh, ms);
2662  out:
2663         return error;
2664 }
2665
2666 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2667 {
2668         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2669 }
2670
2671 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2672 {
2673         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2674 }
2675
2676 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2677 {
2678         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2679 }
2680
2681 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2682 {
2683         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2684 }
2685
2686 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2687                              int ret_nodeid, int rv)
2688 {
2689         struct dlm_rsb *r = &ls->ls_stub_rsb;
2690         struct dlm_message *ms;
2691         struct dlm_mhandle *mh;
2692         int error, nodeid = ms_in->m_header.h_nodeid;
2693
2694         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2695         if (error)
2696                 goto out;
2697
2698         ms->m_lkid = ms_in->m_lkid;
2699         ms->m_result = rv;
2700         ms->m_nodeid = ret_nodeid;
2701
2702         error = send_message(mh, ms);
2703  out:
2704         return error;
2705 }
2706
2707 /* which args we save from a received message depends heavily on the type
2708    of message, unlike the send side where we can safely send everything about
2709    the lkb for any type of message */
2710
2711 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2712 {
2713         lkb->lkb_exflags = ms->m_exflags;
2714         lkb->lkb_sbflags = ms->m_sbflags;
2715         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2716                          (ms->m_flags & 0x0000FFFF);
2717 }
2718
2719 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2720 {
2721         lkb->lkb_sbflags = ms->m_sbflags;
2722         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2723                          (ms->m_flags & 0x0000FFFF);
2724 }
2725
2726 static int receive_extralen(struct dlm_message *ms)
2727 {
2728         return (ms->m_header.h_length - sizeof(struct dlm_message));
2729 }
2730
2731 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2732                        struct dlm_message *ms)
2733 {
2734         int len;
2735
2736         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2737                 if (!lkb->lkb_lvbptr)
2738                         lkb->lkb_lvbptr = allocate_lvb(ls);
2739                 if (!lkb->lkb_lvbptr)
2740                         return -ENOMEM;
2741                 len = receive_extralen(ms);
2742                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2743         }
2744         return 0;
2745 }
2746
2747 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2748                                 struct dlm_message *ms)
2749 {
2750         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2751         lkb->lkb_ownpid = ms->m_pid;
2752         lkb->lkb_remid = ms->m_lkid;
2753         lkb->lkb_grmode = DLM_LOCK_IV;
2754         lkb->lkb_rqmode = ms->m_rqmode;
2755         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2756         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2757
2758         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2759
2760         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2761                 /* lkb was just created so there won't be an lvb yet */
2762                 lkb->lkb_lvbptr = allocate_lvb(ls);
2763                 if (!lkb->lkb_lvbptr)
2764                         return -ENOMEM;
2765         }
2766
2767         return 0;
2768 }
2769
2770 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2771                                 struct dlm_message *ms)
2772 {
2773         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2774                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2775                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2776                           lkb->lkb_id, lkb->lkb_remid);
2777                 return -EINVAL;
2778         }
2779
2780         if (!is_master_copy(lkb))
2781                 return -EINVAL;
2782
2783         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2784                 return -EBUSY;
2785
2786         if (receive_lvb(ls, lkb, ms))
2787                 return -ENOMEM;
2788
2789         lkb->lkb_rqmode = ms->m_rqmode;
2790         lkb->lkb_lvbseq = ms->m_lvbseq;
2791
2792         return 0;
2793 }
2794
2795 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2796                                struct dlm_message *ms)
2797 {
2798         if (!is_master_copy(lkb))
2799                 return -EINVAL;
2800         if (receive_lvb(ls, lkb, ms))
2801                 return -ENOMEM;
2802         return 0;
2803 }
2804
2805 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2806    uses to send a reply and that the remote end uses to process the reply. */
2807
2808 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2809 {
2810         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2811         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2812         lkb->lkb_remid = ms->m_lkid;
2813 }
2814
2815 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2816 {
2817         struct dlm_lkb *lkb;
2818         struct dlm_rsb *r;
2819         int error, namelen;
2820
2821         error = create_lkb(ls, &lkb);
2822         if (error)
2823                 goto fail;
2824
2825         receive_flags(lkb, ms);
2826         lkb->lkb_flags |= DLM_IFL_MSTCPY;
2827         error = receive_request_args(ls, lkb, ms);
2828         if (error) {
2829                 __put_lkb(ls, lkb);
2830                 goto fail;
2831         }
2832
2833         namelen = receive_extralen(ms);
2834
2835         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2836         if (error) {
2837                 __put_lkb(ls, lkb);
2838                 goto fail;
2839         }
2840
2841         lock_rsb(r);
2842
2843         attach_lkb(r, lkb);
2844         error = do_request(r, lkb);
2845         send_request_reply(r, lkb, error);
2846
2847         unlock_rsb(r);
2848         put_rsb(r);
2849
2850         if (error == -EINPROGRESS)
2851                 error = 0;
2852         if (error)
2853                 dlm_put_lkb(lkb);
2854         return;
2855
2856  fail:
2857         setup_stub_lkb(ls, ms);
2858         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2859 }
2860
2861 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2862 {
2863         struct dlm_lkb *lkb;
2864         struct dlm_rsb *r;
2865         int error, reply = 1;
2866
2867         error = find_lkb(ls, ms->m_remid, &lkb);
2868         if (error)
2869                 goto fail;
2870
2871         r = lkb->lkb_resource;
2872
2873         hold_rsb(r);
2874         lock_rsb(r);
2875
2876         receive_flags(lkb, ms);
2877         error = receive_convert_args(ls, lkb, ms);
2878         if (error)
2879                 goto out;
2880         reply = !down_conversion(lkb);
2881
2882         error = do_convert(r, lkb);
2883  out:
2884         if (reply)
2885                 send_convert_reply(r, lkb, error);
2886
2887         unlock_rsb(r);
2888         put_rsb(r);
2889         dlm_put_lkb(lkb);
2890         return;
2891
2892  fail:
2893         setup_stub_lkb(ls, ms);
2894         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2895 }
2896
2897 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2898 {
2899         struct dlm_lkb *lkb;
2900         struct dlm_rsb *r;
2901         int error;
2902
2903         error = find_lkb(ls, ms->m_remid, &lkb);
2904         if (error)
2905                 goto fail;
2906
2907         r = lkb->lkb_resource;
2908
2909         hold_rsb(r);
2910         lock_rsb(r);
2911
2912         receive_flags(lkb, ms);
2913         error = receive_unlock_args(ls, lkb, ms);
2914         if (error)
2915                 goto out;
2916
2917         error = do_unlock(r, lkb);
2918  out:
2919         send_unlock_reply(r, lkb, error);
2920
2921         unlock_rsb(r);
2922         put_rsb(r);
2923         dlm_put_lkb(lkb);
2924         return;
2925
2926  fail:
2927         setup_stub_lkb(ls, ms);
2928         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2929 }
2930
2931 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2932 {
2933         struct dlm_lkb *lkb;
2934         struct dlm_rsb *r;
2935         int error;
2936
2937         error = find_lkb(ls, ms->m_remid, &lkb);
2938         if (error)
2939                 goto fail;
2940
2941         receive_flags(lkb, ms);
2942
2943         r = lkb->lkb_resource;
2944
2945         hold_rsb(r);
2946         lock_rsb(r);
2947
2948         error = do_cancel(r, lkb);
2949         send_cancel_reply(r, lkb, error);
2950
2951         unlock_rsb(r);
2952         put_rsb(r);
2953         dlm_put_lkb(lkb);
2954         return;
2955
2956  fail:
2957         setup_stub_lkb(ls, ms);
2958         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2959 }
2960
2961 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2962 {
2963         struct dlm_lkb *lkb;
2964         struct dlm_rsb *r;
2965         int error;
2966
2967         error = find_lkb(ls, ms->m_remid, &lkb);
2968         if (error) {
2969                 log_error(ls, "receive_grant no lkb");
2970                 return;
2971         }
2972         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2973
2974         r = lkb->lkb_resource;
2975
2976         hold_rsb(r);
2977         lock_rsb(r);
2978
2979         receive_flags_reply(lkb, ms);
2980         if (is_altmode(lkb))
2981                 munge_altmode(lkb, ms);
2982         grant_lock_pc(r, lkb, ms);
2983         queue_cast(r, lkb, 0);
2984
2985         unlock_rsb(r);
2986         put_rsb(r);
2987         dlm_put_lkb(lkb);
2988 }
2989
2990 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2991 {
2992         struct dlm_lkb *lkb;
2993         struct dlm_rsb *r;
2994         int error;
2995
2996         error = find_lkb(ls, ms->m_remid, &lkb);
2997         if (error) {
2998                 log_error(ls, "receive_bast no lkb");
2999                 return;
3000         }
3001         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3002
3003         r = lkb->lkb_resource;
3004
3005         hold_rsb(r);
3006         lock_rsb(r);
3007
3008         queue_bast(r, lkb, ms->m_bastmode);
3009
3010         unlock_rsb(r);
3011         put_rsb(r);
3012         dlm_put_lkb(lkb);
3013 }
3014
3015 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3016 {
3017         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3018
3019         from_nodeid = ms->m_header.h_nodeid;
3020         our_nodeid = dlm_our_nodeid();
3021
3022         len = receive_extralen(ms);
3023
3024         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3025         if (dir_nodeid != our_nodeid) {
3026                 log_error(ls, "lookup dir_nodeid %d from %d",
3027                           dir_nodeid, from_nodeid);
3028                 error = -EINVAL;
3029                 ret_nodeid = -1;
3030                 goto out;
3031         }
3032
3033         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3034
3035         /* Optimization: we're master so treat lookup as a request */
3036         if (!error && ret_nodeid == our_nodeid) {
3037                 receive_request(ls, ms);
3038                 return;
3039         }
3040  out:
3041         send_lookup_reply(ls, ms, ret_nodeid, error);
3042 }
3043
3044 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3045 {
3046         int len, dir_nodeid, from_nodeid;
3047
3048         from_nodeid = ms->m_header.h_nodeid;
3049
3050         len = receive_extralen(ms);
3051
3052         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3053         if (dir_nodeid != dlm_our_nodeid()) {
3054                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3055                           dir_nodeid, from_nodeid);
3056                 return;
3057         }
3058
3059         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3060 }
3061
3062 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3063 {
3064         do_purge(ls, ms->m_nodeid, ms->m_pid);
3065 }
3066
3067 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3068 {
3069         struct dlm_lkb *lkb;
3070         struct dlm_rsb *r;
3071         int error, mstype, result;
3072
3073         error = find_lkb(ls, ms->m_remid, &lkb);
3074         if (error) {
3075                 log_error(ls, "receive_request_reply no lkb");
3076                 return;
3077         }
3078         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3079
3080         r = lkb->lkb_resource;
3081         hold_rsb(r);
3082         lock_rsb(r);
3083
3084         mstype = lkb->lkb_wait_type;
3085         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3086         if (error)
3087                 goto out;
3088
3089         /* Optimization: the dir node was also the master, so it took our
3090            lookup as a request and sent request reply instead of lookup reply */
3091         if (mstype == DLM_MSG_LOOKUP) {
3092                 r->res_nodeid = ms->m_header.h_nodeid;
3093                 lkb->lkb_nodeid = r->res_nodeid;
3094         }
3095
3096         /* this is the value returned from do_request() on the master */
3097         result = ms->m_result;
3098
3099         switch (result) {
3100         case -EAGAIN:
3101                 /* request would block (be queued) on remote master */
3102                 queue_cast(r, lkb, -EAGAIN);
3103                 confirm_master(r, -EAGAIN);
3104                 unhold_lkb(lkb); /* undoes create_lkb() */
3105                 break;
3106
3107         case -EINPROGRESS:
3108         case 0:
3109                 /* request was queued or granted on remote master */
3110                 receive_flags_reply(lkb, ms);
3111                 lkb->lkb_remid = ms->m_lkid;
3112                 if (is_altmode(lkb))
3113                         munge_altmode(lkb, ms);
3114                 if (result)
3115                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3116                 else {
3117                         grant_lock_pc(r, lkb, ms);
3118                         queue_cast(r, lkb, 0);
3119                 }
3120                 confirm_master(r, result);
3121                 break;
3122
3123         case -EBADR:
3124         case -ENOTBLK:
3125                 /* find_rsb failed to find rsb or rsb wasn't master */
3126                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3127                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3128                 r->res_nodeid = -1;
3129                 lkb->lkb_nodeid = -1;
3130
3131                 if (is_overlap(lkb)) {
3132                         /* we'll ignore error in cancel/unlock reply */
3133                         queue_cast_overlap(r, lkb);
3134                         unhold_lkb(lkb); /* undoes create_lkb() */
3135                 } else
3136                         _request_lock(r, lkb);
3137                 break;
3138
3139         default:
3140                 log_error(ls, "receive_request_reply %x error %d",
3141                           lkb->lkb_id, result);
3142         }
3143
3144         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3145                 log_debug(ls, "receive_request_reply %x result %d unlock",
3146                           lkb->lkb_id, result);
3147                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3148                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3149                 send_unlock(r, lkb);
3150         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3151                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3152                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3153                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3154                 send_cancel(r, lkb);
3155         } else {
3156                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3157                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3158         }
3159  out:
3160         unlock_rsb(r);
3161         put_rsb(r);
3162         dlm_put_lkb(lkb);
3163 }
3164
3165 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3166                                     struct dlm_message *ms)
3167 {
3168         /* this is the value returned from do_convert() on the master */
3169         switch (ms->m_result) {
3170         case -EAGAIN:
3171                 /* convert would block (be queued) on remote master */
3172                 queue_cast(r, lkb, -EAGAIN);
3173                 break;
3174
3175         case -EINPROGRESS:
3176                 /* convert was queued on remote master */
3177                 receive_flags_reply(lkb, ms);
3178                 if (is_demoted(lkb))
3179                         munge_demoted(lkb, ms);
3180                 del_lkb(r, lkb);
3181                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3182                 break;
3183
3184         case 0:
3185                 /* convert was granted on remote master */
3186                 receive_flags_reply(lkb, ms);
3187                 if (is_demoted(lkb))
3188                         munge_demoted(lkb, ms);
3189                 grant_lock_pc(r, lkb, ms);
3190                 queue_cast(r, lkb, 0);
3191                 break;
3192
3193         default:
3194                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3195                           lkb->lkb_id, ms->m_result);
3196         }
3197 }
3198
3199 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3200 {
3201         struct dlm_rsb *r = lkb->lkb_resource;
3202         int error;
3203
3204         hold_rsb(r);
3205         lock_rsb(r);
3206
3207         /* stub reply can happen with waiters_mutex held */
3208         error = remove_from_waiters_ms(lkb, ms);
3209         if (error)
3210                 goto out;
3211
3212         __receive_convert_reply(r, lkb, ms);
3213  out:
3214         unlock_rsb(r);
3215         put_rsb(r);
3216 }
3217
3218 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3219 {
3220         struct dlm_lkb *lkb;
3221         int error;
3222
3223         error = find_lkb(ls, ms->m_remid, &lkb);
3224         if (error) {
3225                 log_error(ls, "receive_convert_reply no lkb");
3226                 return;
3227         }
3228         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3229
3230         _receive_convert_reply(lkb, ms);
3231         dlm_put_lkb(lkb);
3232 }
3233
3234 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3235 {
3236         struct dlm_rsb *r = lkb->lkb_resource;
3237         int error;
3238
3239         hold_rsb(r);
3240         lock_rsb(r);
3241
3242         /* stub reply can happen with waiters_mutex held */
3243         error = remove_from_waiters_ms(lkb, ms);
3244         if (error)
3245                 goto out;
3246
3247         /* this is the value returned from do_unlock() on the master */
3248
3249         switch (ms->m_result) {
3250         case -DLM_EUNLOCK:
3251                 receive_flags_reply(lkb, ms);
3252                 remove_lock_pc(r, lkb);
3253                 queue_cast(r, lkb, -DLM_EUNLOCK);
3254                 break;
3255         case -ENOENT:
3256                 break;
3257         default:
3258                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3259                           lkb->lkb_id, ms->m_result);
3260         }
3261  out:
3262         unlock_rsb(r);
3263         put_rsb(r);
3264 }
3265
3266 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3267 {
3268         struct dlm_lkb *lkb;
3269         int error;
3270
3271         error = find_lkb(ls, ms->m_remid, &lkb);
3272         if (error) {
3273                 log_error(ls, "receive_unlock_reply no lkb");
3274                 return;
3275         }
3276         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3277
3278         _receive_unlock_reply(lkb, ms);
3279         dlm_put_lkb(lkb);
3280 }
3281
3282 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3283 {
3284         struct dlm_rsb *r = lkb->lkb_resource;
3285         int error;
3286
3287         hold_rsb(r);
3288         lock_rsb(r);
3289
3290         /* stub reply can happen with waiters_mutex held */
3291         error = remove_from_waiters_ms(lkb, ms);
3292         if (error)
3293                 goto out;
3294
3295         /* this is the value returned from do_cancel() on the master */
3296
3297         switch (ms->m_result) {
3298         case -DLM_ECANCEL:
3299                 receive_flags_reply(lkb, ms);
3300                 revert_lock_pc(r, lkb);
3301                 if (ms->m_result)
3302                         queue_cast(r, lkb, -DLM_ECANCEL);
3303                 break;
3304         case 0:
3305                 break;
3306         default:
3307                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3308                           lkb->lkb_id, ms->m_result);
3309         }
3310  out:
3311         unlock_rsb(r);
3312         put_rsb(r);
3313 }
3314
3315 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3316 {
3317         struct dlm_lkb *lkb;
3318         int error;
3319
3320         error = find_lkb(ls, ms->m_remid, &lkb);
3321         if (error) {
3322                 log_error(ls, "receive_cancel_reply no lkb");
3323                 return;
3324         }
3325         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3326
3327         _receive_cancel_reply(lkb, ms);
3328         dlm_put_lkb(lkb);
3329 }
3330
3331 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3332 {
3333         struct dlm_lkb *lkb;
3334         struct dlm_rsb *r;
3335         int error, ret_nodeid;
3336
3337         error = find_lkb(ls, ms->m_lkid, &lkb);
3338         if (error) {
3339                 log_error(ls, "receive_lookup_reply no lkb");
3340                 return;
3341         }
3342
3343         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3344            FIXME: will a non-zero error ever be returned? */
3345
3346         r = lkb->lkb_resource;
3347         hold_rsb(r);
3348         lock_rsb(r);
3349
3350         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3351         if (error)
3352                 goto out;
3353
3354         ret_nodeid = ms->m_nodeid;
3355         if (ret_nodeid == dlm_our_nodeid()) {
3356                 r->res_nodeid = 0;
3357                 ret_nodeid = 0;
3358                 r->res_first_lkid = 0;
3359         } else {
3360                 /* set_master() will copy res_nodeid to lkb_nodeid */
3361                 r->res_nodeid = ret_nodeid;
3362         }
3363
3364         if (is_overlap(lkb)) {
3365                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3366                           lkb->lkb_id, lkb->lkb_flags);
3367                 queue_cast_overlap(r, lkb);
3368                 unhold_lkb(lkb); /* undoes create_lkb() */
3369                 goto out_list;
3370         }
3371
3372         _request_lock(r, lkb);
3373
3374  out_list:
3375         if (!ret_nodeid)
3376                 process_lookup_list(r);
3377  out:
3378         unlock_rsb(r);
3379         put_rsb(r);
3380         dlm_put_lkb(lkb);
3381 }
3382
3383 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3384 {
3385         struct dlm_message *ms = (struct dlm_message *) hd;
3386         struct dlm_ls *ls;
3387         int error = 0;
3388
3389         if (!recovery)
3390                 dlm_message_in(ms);
3391
3392         ls = dlm_find_lockspace_global(hd->h_lockspace);
3393         if (!ls) {
3394                 log_print("drop message %d from %d for unknown lockspace %d",
3395                           ms->m_type, nodeid, hd->h_lockspace);
3396                 return -EINVAL;
3397         }
3398
3399         /* recovery may have just ended leaving a bunch of backed-up requests
3400            in the requestqueue; wait while dlm_recoverd clears them */
3401
3402         if (!recovery)
3403                 dlm_wait_requestqueue(ls);
3404
3405         /* recovery may have just started while there were a bunch of
3406            in-flight requests -- save them in requestqueue to be processed
3407            after recovery.  we can't let dlm_recvd block on the recovery
3408            lock.  if dlm_recoverd is calling this function to clear the
3409            requestqueue, it needs to be interrupted (-EINTR) if another
3410            recovery operation is starting. */
3411
3412         while (1) {
3413                 if (dlm_locking_stopped(ls)) {
3414                         if (recovery) {
3415                                 error = -EINTR;
3416                                 goto out;
3417                         }
3418                         error = dlm_add_requestqueue(ls, nodeid, hd);
3419                         if (error == -EAGAIN)
3420                                 continue;
3421                         else {
3422                                 error = -EINTR;
3423                                 goto out;
3424                         }
3425                 }
3426
3427                 if (lock_recovery_try(ls))
3428                         break;
3429                 schedule();
3430         }
3431
3432         switch (ms->m_type) {
3433
3434         /* messages sent to a master node */
3435
3436         case DLM_MSG_REQUEST:
3437                 receive_request(ls, ms);
3438                 break;
3439
3440         case DLM_MSG_CONVERT:
3441                 receive_convert(ls, ms);
3442                 break;
3443
3444         case DLM_MSG_UNLOCK:
3445                 receive_unlock(ls, ms);
3446                 break;
3447
3448         case DLM_MSG_CANCEL:
3449                 receive_cancel(ls, ms);
3450                 break;
3451
3452         /* messages sent from a master node (replies to above) */
3453
3454         case DLM_MSG_REQUEST_REPLY:
3455                 receive_request_reply(ls, ms);
3456                 break;
3457
3458         case DLM_MSG_CONVERT_REPLY:
3459                 receive_convert_reply(ls, ms);
3460                 break;
3461
3462         case DLM_MSG_UNLOCK_REPLY:
3463                 receive_unlock_reply(ls, ms);
3464                 break;
3465
3466         case DLM_MSG_CANCEL_REPLY:
3467                 receive_cancel_reply(ls, ms);
3468                 break;
3469
3470         /* messages sent from a master node (only two types of async msg) */
3471
3472         case DLM_MSG_GRANT:
3473                 receive_grant(ls, ms);
3474                 break;
3475
3476         case DLM_MSG_BAST:
3477                 receive_bast(ls, ms);
3478                 break;
3479
3480         /* messages sent to a dir node */
3481
3482         case DLM_MSG_LOOKUP:
3483                 receive_lookup(ls, ms);
3484                 break;
3485
3486         case DLM_MSG_REMOVE:
3487                 receive_remove(ls, ms);
3488                 break;
3489
3490         /* messages sent from a dir node (remove has no reply) */
3491
3492         case DLM_MSG_LOOKUP_REPLY:
3493                 receive_lookup_reply(ls, ms);
3494                 break;
3495
3496         /* other messages */
3497
3498         case DLM_MSG_PURGE:
3499                 receive_purge(ls, ms);
3500                 break;
3501
3502         default:
3503                 log_error(ls, "unknown message type %d", ms->m_type);
3504         }
3505
3506         unlock_recovery(ls);
3507  out:
3508         dlm_put_lockspace(ls);
3509         dlm_astd_wake();
3510         return error;
3511 }
3512
3513
3514 /*
3515  * Recovery related
3516  */
3517
3518 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3519 {
3520         if (middle_conversion(lkb)) {
3521                 hold_lkb(lkb);
3522                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3523                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3524                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3525                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3526
3527                 /* Same special case as in receive_rcom_lock_args() */
3528                 lkb->lkb_grmode = DLM_LOCK_IV;
3529                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3530                 unhold_lkb(lkb);
3531
3532         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3533                 lkb->lkb_flags |= DLM_IFL_RESEND;
3534         }
3535
3536         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3537            conversions are async; there's no reply from the remote master */
3538 }
3539
3540 /* A waiting lkb needs recovery if the master node has failed, or
3541    the master node is changing (only when no directory is used) */
3542
3543 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3544 {
3545         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3546                 return 1;
3547
3548         if (!dlm_no_directory(ls))
3549                 return 0;
3550
3551         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3552                 return 1;
3553
3554         return 0;
3555 }
3556
3557 /* Recovery for locks that are waiting for replies from nodes that are now
3558    gone.  We can just complete unlocks and cancels by faking a reply from the
3559    dead node.  Requests and up-conversions we flag to be resent after
3560    recovery.  Down-conversions can just be completed with a fake reply like
3561    unlocks.  Conversions between PR and CW need special attention. */
3562
3563 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3564 {
3565         struct dlm_lkb *lkb, *safe;
3566
3567         mutex_lock(&ls->ls_waiters_mutex);
3568
3569         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3570                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3571                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3572
3573                 /* all outstanding lookups, regardless of destination  will be
3574                    resent after recovery is done */
3575
3576                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3577                         lkb->lkb_flags |= DLM_IFL_RESEND;
3578                         continue;
3579                 }
3580
3581                 if (!waiter_needs_recovery(ls, lkb))
3582                         continue;
3583
3584                 switch (lkb->lkb_wait_type) {
3585
3586                 case DLM_MSG_REQUEST:
3587                         lkb->lkb_flags |= DLM_IFL_RESEND;
3588                         break;
3589
3590                 case DLM_MSG_CONVERT:
3591                         recover_convert_waiter(ls, lkb);
3592                         break;
3593
3594                 case DLM_MSG_UNLOCK:
3595                         hold_lkb(lkb);
3596                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3597                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3598                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3599                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3600                         dlm_put_lkb(lkb);
3601                         break;
3602
3603                 case DLM_MSG_CANCEL:
3604                         hold_lkb(lkb);
3605                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3606                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3607                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3608                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3609                         dlm_put_lkb(lkb);
3610                         break;
3611
3612                 default:
3613                         log_error(ls, "invalid lkb wait_type %d",
3614                                   lkb->lkb_wait_type);
3615                 }
3616                 schedule();
3617         }
3618         mutex_unlock(&ls->ls_waiters_mutex);
3619 }
3620
3621 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3622 {
3623         struct dlm_lkb *lkb;
3624         int found = 0;
3625
3626         mutex_lock(&ls->ls_waiters_mutex);
3627         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3628                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3629                         hold_lkb(lkb);
3630                         found = 1;
3631                         break;
3632                 }
3633         }
3634         mutex_unlock(&ls->ls_waiters_mutex);
3635
3636         if (!found)
3637                 lkb = NULL;
3638         return lkb;
3639 }
3640
3641 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3642    master or dir-node for r.  Processing the lkb may result in it being placed
3643    back on waiters. */
3644
3645 /* We do this after normal locking has been enabled and any saved messages
3646    (in requestqueue) have been processed.  We should be confident that at
3647    this point we won't get or process a reply to any of these waiting
3648    operations.  But, new ops may be coming in on the rsbs/locks here from
3649    userspace or remotely. */
3650
3651 /* there may have been an overlap unlock/cancel prior to recovery or after
3652    recovery.  if before, the lkb may still have a pos wait_count; if after, the
3653    overlap flag would just have been set and nothing new sent.  we can be
3654    confident here than any replies to either the initial op or overlap ops
3655    prior to recovery have been received. */
3656
3657 int dlm_recover_waiters_post(struct dlm_ls *ls)
3658 {
3659         struct dlm_lkb *lkb;
3660         struct dlm_rsb *r;
3661         int error = 0, mstype, err, oc, ou;
3662
3663         while (1) {
3664                 if (dlm_locking_stopped(ls)) {
3665                         log_debug(ls, "recover_waiters_post aborted");
3666                         error = -EINTR;
3667                         break;
3668                 }
3669
3670                 lkb = find_resend_waiter(ls);
3671                 if (!lkb)
3672                         break;
3673
3674                 r = lkb->lkb_resource;
3675                 hold_rsb(r);
3676                 lock_rsb(r);
3677
3678                 mstype = lkb->lkb_wait_type;
3679                 oc = is_overlap_cancel(lkb);
3680                 ou = is_overlap_unlock(lkb);
3681                 err = 0;
3682
3683                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3684                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3685
3686                 /* At this point we assume that we won't get a reply to any
3687                    previous op or overlap op on this lock.  First, do a big
3688                    remove_from_waiters() for all previous ops. */
3689
3690                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3691                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3692                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3693                 lkb->lkb_wait_type = 0;
3694                 lkb->lkb_wait_count = 0;
3695                 mutex_lock(&ls->ls_waiters_mutex);
3696                 list_del_init(&lkb->lkb_wait_reply);
3697                 mutex_unlock(&ls->ls_waiters_mutex);
3698                 unhold_lkb(lkb); /* for waiters list */
3699
3700                 if (oc || ou) {
3701                         /* do an unlock or cancel instead of resending */
3702                         switch (mstype) {
3703                         case DLM_MSG_LOOKUP:
3704                         case DLM_MSG_REQUEST:
3705                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3706                                                         -DLM_ECANCEL);
3707                                 unhold_lkb(lkb); /* undoes create_lkb() */
3708                                 break;
3709                         case DLM_MSG_CONVERT:
3710                                 if (oc) {
3711                                         queue_cast(r, lkb, -DLM_ECANCEL);
3712                                 } else {
3713                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3714                                         _unlock_lock(r, lkb);
3715                                 }
3716                                 break;
3717                         default:
3718                                 err = 1;
3719                         }
3720                 } else {
3721                         switch (mstype) {
3722                         case DLM_MSG_LOOKUP:
3723                         case DLM_MSG_REQUEST:
3724                                 _request_lock(r, lkb);
3725                                 if (is_master(r))
3726                                         confirm_master(r, 0);
3727                                 break;
3728                         case DLM_MSG_CONVERT:
3729                                 _convert_lock(r, lkb);
3730                                 break;
3731                         default:
3732                                 err = 1;
3733                         }
3734                 }
3735
3736                 if (err)
3737                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
3738                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3739                 unlock_rsb(r);
3740                 put_rsb(r);
3741                 dlm_put_lkb(lkb);
3742         }
3743
3744         return error;
3745 }
3746
3747 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3748                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3749 {
3750         struct dlm_ls *ls = r->res_ls;
3751         struct dlm_lkb *lkb, *safe;
3752
3753         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3754                 if (test(ls, lkb)) {
3755                         rsb_set_flag(r, RSB_LOCKS_PURGED);
3756                         del_lkb(r, lkb);
3757                         /* this put should free the lkb */
3758                         if (!dlm_put_lkb(lkb))
3759                                 log_error(ls, "purged lkb not released");
3760                 }
3761         }
3762 }
3763
3764 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3765 {
3766         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3767 }
3768
3769 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3770 {
3771         return is_master_copy(lkb);
3772 }
3773
3774 static void purge_dead_locks(struct dlm_rsb *r)
3775 {
3776         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3777         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3778         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3779 }
3780
3781 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3782 {
3783         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3784         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3785         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3786 }
3787
3788 /* Get rid of locks held by nodes that are gone. */
3789
3790 int dlm_purge_locks(struct dlm_ls *ls)
3791 {
3792         struct dlm_rsb *r;
3793
3794         log_debug(ls, "dlm_purge_locks");
3795
3796         down_write(&ls->ls_root_sem);
3797         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3798                 hold_rsb(r);
3799                 lock_rsb(r);
3800                 if (is_master(r))
3801                         purge_dead_locks(r);
3802                 unlock_rsb(r);
3803                 unhold_rsb(r);
3804
3805                 schedule();
3806         }
3807         up_write(&ls->ls_root_sem);
3808
3809         return 0;
3810 }
3811
3812 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3813 {
3814         struct dlm_rsb *r, *r_ret = NULL;
3815
3816         read_lock(&ls->ls_rsbtbl[bucket].lock);
3817         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3818                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3819                         continue;
3820                 hold_rsb(r);
3821                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3822                 r_ret = r;
3823                 break;
3824         }
3825         read_unlock(&ls->ls_rsbtbl[bucket].lock);
3826         return r_ret;
3827 }
3828
3829 void dlm_grant_after_purge(struct dlm_ls *ls)
3830 {
3831         struct dlm_rsb *r;
3832         int bucket = 0;
3833
3834         while (1) {
3835                 r = find_purged_rsb(ls, bucket);
3836                 if (!r) {
3837                         if (bucket == ls->ls_rsbtbl_size - 1)
3838                                 break;
3839                         bucket++;
3840                         continue;
3841                 }
3842                 lock_rsb(r);
3843                 if (is_master(r)) {
3844                         grant_pending_locks(r);
3845                         confirm_master(r, 0);
3846                 }
3847                 unlock_rsb(r);
3848                 put_rsb(r);
3849                 schedule();
3850         }
3851 }
3852
3853 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3854                                          uint32_t remid)
3855 {
3856         struct dlm_lkb *lkb;
3857
3858         list_for_each_entry(lkb, head, lkb_statequeue) {
3859                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3860                         return lkb;
3861         }
3862         return NULL;
3863 }
3864
3865 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3866                                     uint32_t remid)
3867 {
3868         struct dlm_lkb *lkb;
3869
3870         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3871         if (lkb)
3872                 return lkb;
3873         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3874         if (lkb)
3875                 return lkb;
3876         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3877         if (lkb)
3878                 return lkb;
3879         return NULL;
3880 }
3881
3882 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3883                                   struct dlm_rsb *r, struct dlm_rcom *rc)
3884 {
3885         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3886         int lvblen;
3887
3888         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3889         lkb->lkb_ownpid = rl->rl_ownpid;
3890         lkb->lkb_remid = rl->rl_lkid;
3891         lkb->lkb_exflags = rl->rl_exflags;
3892         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3893         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3894         lkb->lkb_lvbseq = rl->rl_lvbseq;
3895         lkb->lkb_rqmode = rl->rl_rqmode;
3896         lkb->lkb_grmode = rl->rl_grmode;
3897         /* don't set lkb_status because add_lkb wants to itself */
3898
3899         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3900         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3901
3902         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3903                 lkb->lkb_lvbptr = allocate_lvb(ls);
3904                 if (!lkb->lkb_lvbptr)
3905                         return -ENOMEM;
3906                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3907                          sizeof(struct rcom_lock);
3908                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3909         }
3910
3911         /* Conversions between PR and CW (middle modes) need special handling.
3912            The real granted mode of these converting locks cannot be determined
3913            until all locks have been rebuilt on the rsb (recover_conversion) */
3914
3915         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3916                 rl->rl_status = DLM_LKSTS_CONVERT;
3917                 lkb->lkb_grmode = DLM_LOCK_IV;
3918                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3919         }
3920
3921         return 0;
3922 }
3923
3924 /* This lkb may have been recovered in a previous aborted recovery so we need
3925    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3926    If so we just send back a standard reply.  If not, we create a new lkb with
3927    the given values and send back our lkid.  We send back our lkid by sending
3928    back the rcom_lock struct we got but with the remid field filled in. */
3929
3930 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3931 {
3932         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3933         struct dlm_rsb *r;
3934         struct dlm_lkb *lkb;
3935         int error;
3936
3937         if (rl->rl_parent_lkid) {
3938                 error = -EOPNOTSUPP;
3939                 goto out;
3940         }
3941
3942         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3943         if (error)
3944                 goto out;
3945
3946         lock_rsb(r);
3947
3948         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3949         if (lkb) {
3950                 error = -EEXIST;
3951                 goto out_remid;
3952         }
3953
3954         error = create_lkb(ls, &lkb);
3955         if (error)
3956                 goto out_unlock;
3957
3958         error = receive_rcom_lock_args(ls, lkb, r, rc);
3959         if (error) {
3960                 __put_lkb(ls, lkb);
3961                 goto out_unlock;
3962         }
3963
3964         attach_lkb(r, lkb);
3965         add_lkb(r, lkb, rl->rl_status);
3966         error = 0;
3967
3968  out_remid:
3969         /* this is the new value returned to the lock holder for
3970            saving in its process-copy lkb */
3971         rl->rl_remid = lkb->lkb_id;
3972
3973  out_unlock:
3974         unlock_rsb(r);
3975         put_rsb(r);
3976  out:
3977         if (error)
3978                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3979         rl->rl_result = error;
3980         return error;
3981 }
3982
3983 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3984 {
3985         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3986         struct dlm_rsb *r;
3987         struct dlm_lkb *lkb;
3988         int error;
3989
3990         error = find_lkb(ls, rl->rl_lkid, &lkb);
3991         if (error) {
3992                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3993                 return error;
3994         }
3995
3996         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3997
3998         error = rl->rl_result;
3999
4000         r = lkb->lkb_resource;
4001         hold_rsb(r);
4002         lock_rsb(r);
4003
4004         switch (error) {
4005         case -EBADR:
4006                 /* There's a chance the new master received our lock before
4007                    dlm_recover_master_reply(), this wouldn't happen if we did
4008                    a barrier between recover_masters and recover_locks. */
4009                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4010                           (unsigned long)r, r->res_name);
4011                 dlm_send_rcom_lock(r, lkb);
4012                 goto out;
4013         case -EEXIST:
4014                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4015                 /* fall through */
4016         case 0:
4017                 lkb->lkb_remid = rl->rl_remid;
4018                 break;
4019         default:
4020                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4021                           error, lkb->lkb_id);
4022         }
4023
4024         /* an ack for dlm_recover_locks() which waits for replies from
4025            all the locks it sends to new masters */
4026         dlm_recovered_lock(r);
4027  out:
4028         unlock_rsb(r);
4029         put_rsb(r);
4030         dlm_put_lkb(lkb);
4031
4032         return 0;
4033 }
4034
4035 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4036                      int mode, uint32_t flags, void *name, unsigned int namelen,
4037                      uint32_t parent_lkid)
4038 {
4039         struct dlm_lkb *lkb;
4040         struct dlm_args args;
4041         int error;
4042
4043         lock_recovery(ls);
4044
4045         error = create_lkb(ls, &lkb);
4046         if (error) {
4047                 kfree(ua);
4048                 goto out;
4049         }
4050
4051         if (flags & DLM_LKF_VALBLK) {
4052                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4053                 if (!ua->lksb.sb_lvbptr) {
4054                         kfree(ua);
4055                         __put_lkb(ls, lkb);
4056                         error = -ENOMEM;
4057                         goto out;
4058                 }
4059         }
4060
4061         /* After ua is attached to lkb it will be freed by free_lkb().
4062            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4063            lock and that lkb_astparam is the dlm_user_args structure. */
4064
4065         error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
4066                               DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4067         lkb->lkb_flags |= DLM_IFL_USER;
4068         ua->old_mode = DLM_LOCK_IV;
4069
4070         if (error) {
4071                 __put_lkb(ls, lkb);
4072                 goto out;
4073         }
4074
4075         error = request_lock(ls, lkb, name, namelen, &args);
4076
4077         switch (error) {
4078         case 0:
4079                 break;
4080         case -EINPROGRESS:
4081                 error = 0;
4082                 break;
4083         case -EAGAIN:
4084                 error = 0;
4085                 /* fall through */
4086         default:
4087                 __put_lkb(ls, lkb);
4088                 goto out;
4089         }
4090
4091         /* add this new lkb to the per-process list of locks */
4092         spin_lock(&ua->proc->locks_spin);
4093         hold_lkb(lkb);
4094         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4095         spin_unlock(&ua->proc->locks_spin);
4096  out:
4097         unlock_recovery(ls);
4098         return error;
4099 }
4100
4101 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4102                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4103 {
4104         struct dlm_lkb *lkb;
4105         struct dlm_args args;
4106         struct dlm_user_args *ua;
4107         int error;
4108
4109         lock_recovery(ls);
4110
4111         error = find_lkb(ls, lkid, &lkb);
4112         if (error)
4113                 goto out;
4114
4115         /* user can change the params on its lock when it converts it, or
4116            add an lvb that didn't exist before */
4117
4118         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4119
4120         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4121                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4122                 if (!ua->lksb.sb_lvbptr) {
4123                         error = -ENOMEM;
4124                         goto out_put;
4125                 }
4126         }
4127         if (lvb_in && ua->lksb.sb_lvbptr)
4128                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4129
4130         ua->castparam = ua_tmp->castparam;
4131         ua->castaddr = ua_tmp->castaddr;
4132         ua->bastparam = ua_tmp->bastparam;
4133         ua->bastaddr = ua_tmp->bastaddr;
4134         ua->user_lksb = ua_tmp->user_lksb;
4135         ua->old_mode = lkb->lkb_grmode;
4136
4137         error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4138                               ua, DLM_FAKE_USER_AST, &args);
4139         if (error)
4140                 goto out_put;
4141
4142         error = convert_lock(ls, lkb, &args);
4143
4144         if (error == -EINPROGRESS || error == -EAGAIN)
4145                 error = 0;
4146  out_put:
4147         dlm_put_lkb(lkb);
4148  out:
4149         unlock_recovery(ls);
4150         kfree(ua_tmp);
4151         return error;
4152 }
4153
4154 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4155                     uint32_t flags, uint32_t lkid, char *lvb_in)
4156 {
4157         struct dlm_lkb *lkb;
4158         struct dlm_args args;
4159         struct dlm_user_args *ua;
4160         int error;
4161
4162         lock_recovery(ls);
4163
4164         error = find_lkb(ls, lkid, &lkb);
4165         if (error)
4166                 goto out;
4167
4168         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4169
4170         if (lvb_in && ua->lksb.sb_lvbptr)
4171                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4172         ua->castparam = ua_tmp->castparam;
4173         ua->user_lksb = ua_tmp->user_lksb;
4174
4175         error = set_unlock_args(flags, ua, &args);
4176         if (error)
4177                 goto out_put;
4178
4179         error = unlock_lock(ls, lkb, &args);
4180
4181         if (error == -DLM_EUNLOCK)
4182                 error = 0;
4183         /* from validate_unlock_args() */
4184         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4185                 error = 0;
4186         if (error)
4187                 goto out_put;
4188
4189         spin_lock(&ua->proc->locks_spin);
4190         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4191         if (!list_empty(&lkb->lkb_ownqueue))
4192                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4193         spin_unlock(&ua->proc->locks_spin);
4194  out_put:
4195         dlm_put_lkb(lkb);
4196  out:
4197         unlock_recovery(ls);
4198         kfree(ua_tmp);
4199         return error;
4200 }
4201
4202 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4203                     uint32_t flags, uint32_t lkid)
4204 {
4205         struct dlm_lkb *lkb;
4206         struct dlm_args args;
4207         struct dlm_user_args *ua;
4208         int error;
4209
4210         lock_recovery(ls);
4211
4212         error = find_lkb(ls, lkid, &lkb);
4213         if (error)
4214                 goto out;
4215
4216         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4217         ua->castparam = ua_tmp->castparam;
4218         ua->user_lksb = ua_tmp->user_lksb;
4219
4220         error = set_unlock_args(flags, ua, &args);
4221         if (error)
4222                 goto out_put;
4223
4224         error = cancel_lock(ls, lkb, &args);
4225
4226         if (error == -DLM_ECANCEL)
4227                 error = 0;
4228         /* from validate_unlock_args() */
4229         if (error == -EBUSY)
4230                 error = 0;
4231  out_put:
4232         dlm_put_lkb(lkb);
4233  out:
4234         unlock_recovery(ls);
4235         kfree(ua_tmp);
4236         return error;
4237 }
4238
4239 /* lkb's that are removed from the waiters list by revert are just left on the
4240    orphans list with the granted orphan locks, to be freed by purge */
4241
4242 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4243 {
4244         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4245         struct dlm_args args;
4246         int error;
4247
4248         hold_lkb(lkb);
4249         mutex_lock(&ls->ls_orphans_mutex);
4250         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4251         mutex_unlock(&ls->ls_orphans_mutex);
4252
4253         set_unlock_args(0, ua, &args);
4254
4255         error = cancel_lock(ls, lkb, &args);
4256         if (error == -DLM_ECANCEL)
4257                 error = 0;
4258         return error;
4259 }
4260
4261 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4262    Regardless of what rsb queue the lock is on, it's removed and freed. */
4263
4264 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4265 {
4266         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4267         struct dlm_args args;
4268         int error;
4269
4270         set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4271
4272         error = unlock_lock(ls, lkb, &args);
4273         if (error == -DLM_EUNLOCK)
4274                 error = 0;
4275         return error;
4276 }
4277
4278 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4279    (which does lock_rsb) due to deadlock with receiving a message that does
4280    lock_rsb followed by dlm_user_add_ast() */
4281
4282 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4283                                      struct dlm_user_proc *proc)
4284 {
4285         struct dlm_lkb *lkb = NULL;
4286
4287         mutex_lock(&ls->ls_clear_proc_locks);
4288         if (list_empty(&proc->locks))
4289                 goto out;
4290
4291         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4292         list_del_init(&lkb->lkb_ownqueue);
4293
4294         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4295                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4296         else
4297                 lkb->lkb_flags |= DLM_IFL_DEAD;
4298  out:
4299         mutex_unlock(&ls->ls_clear_proc_locks);
4300         return lkb;
4301 }
4302
4303 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4304    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4305    which we clear here. */
4306
4307 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4308    list, and no more device_writes should add lkb's to proc->locks list; so we
4309    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4310    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4311    them ourself. */
4312
4313 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4314 {
4315         struct dlm_lkb *lkb, *safe;
4316
4317         lock_recovery(ls);
4318
4319         while (1) {
4320                 lkb = del_proc_lock(ls, proc);
4321                 if (!lkb)
4322                         break;
4323                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4324                         orphan_proc_lock(ls, lkb);
4325                 else
4326                         unlock_proc_lock(ls, lkb);
4327
4328                 /* this removes the reference for the proc->locks list
4329                    added by dlm_user_request, it may result in the lkb
4330                    being freed */
4331
4332                 dlm_put_lkb(lkb);
4333         }
4334
4335         mutex_lock(&ls->ls_clear_proc_locks);
4336
4337         /* in-progress unlocks */
4338         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4339                 list_del_init(&lkb->lkb_ownqueue);
4340                 lkb->lkb_flags |= DLM_IFL_DEAD;
4341                 dlm_put_lkb(lkb);
4342         }
4343
4344         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4345                 list_del(&lkb->lkb_astqueue);
4346                 dlm_put_lkb(lkb);
4347         }
4348
4349         mutex_unlock(&ls->ls_clear_proc_locks);
4350         unlock_recovery(ls);
4351 }
4352
4353 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4354 {
4355         struct dlm_lkb *lkb, *safe;
4356
4357         while (1) {
4358                 lkb = NULL;
4359                 spin_lock(&proc->locks_spin);
4360                 if (!list_empty(&proc->locks)) {
4361                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4362                                          lkb_ownqueue);
4363                         list_del_init(&lkb->lkb_ownqueue);
4364                 }
4365                 spin_unlock(&proc->locks_spin);
4366
4367                 if (!lkb)
4368                         break;
4369
4370                 lkb->lkb_flags |= DLM_IFL_DEAD;
4371                 unlock_proc_lock(ls, lkb);
4372                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4373         }
4374
4375         spin_lock(&proc->locks_spin);
4376         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4377                 list_del_init(&lkb->lkb_ownqueue);
4378                 lkb->lkb_flags |= DLM_IFL_DEAD;
4379                 dlm_put_lkb(lkb);
4380         }
4381         spin_unlock(&proc->locks_spin);
4382
4383         spin_lock(&proc->asts_spin);
4384         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4385                 list_del(&lkb->lkb_astqueue);
4386                 dlm_put_lkb(lkb);
4387         }
4388         spin_unlock(&proc->asts_spin);
4389 }
4390
4391 /* pid of 0 means purge all orphans */
4392
4393 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4394 {
4395         struct dlm_lkb *lkb, *safe;
4396
4397         mutex_lock(&ls->ls_orphans_mutex);
4398         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4399                 if (pid && lkb->lkb_ownpid != pid)
4400                         continue;
4401                 unlock_proc_lock(ls, lkb);
4402                 list_del_init(&lkb->lkb_ownqueue);
4403                 dlm_put_lkb(lkb);
4404         }
4405         mutex_unlock(&ls->ls_orphans_mutex);
4406 }
4407
4408 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4409 {
4410         struct dlm_message *ms;
4411         struct dlm_mhandle *mh;
4412         int error;
4413
4414         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4415                                 DLM_MSG_PURGE, &ms, &mh);
4416         if (error)
4417                 return error;
4418         ms->m_nodeid = nodeid;
4419         ms->m_pid = pid;
4420
4421         return send_message(mh, ms);
4422 }
4423
4424 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4425                    int nodeid, int pid)
4426 {
4427         int error = 0;
4428
4429         if (nodeid != dlm_our_nodeid()) {
4430                 error = send_purge(ls, nodeid, pid);
4431         } else {
4432                 lock_recovery(ls);
4433                 if (pid == current->pid)
4434                         purge_proc_locks(ls, proc);
4435                 else
4436                         do_purge(ls, nodeid, pid);
4437                 unlock_recovery(ls);
4438         }
4439         return error;
4440 }
4441