1 // SPDX-License-Identifier: GPL-2.0
3 #include <linux/ceph/ceph_debug.h>
5 #include <linux/module.h>
6 #include <linux/slab.h>
9 #include <linux/ceph/libceph.h>
10 #include <linux/ceph/osdmap.h>
11 #include <linux/ceph/decode.h>
12 #include <linux/crush/hash.h>
13 #include <linux/crush/mapper.h>
15 char *ceph_osdmap_state_str(char *str
, int len
, u32 state
)
20 if ((state
& CEPH_OSD_EXISTS
) && (state
& CEPH_OSD_UP
))
21 snprintf(str
, len
, "exists, up");
22 else if (state
& CEPH_OSD_EXISTS
)
23 snprintf(str
, len
, "exists");
24 else if (state
& CEPH_OSD_UP
)
25 snprintf(str
, len
, "up");
27 snprintf(str
, len
, "doesn't exist");
34 static int calc_bits_of(unsigned int t
)
45 * the foo_mask is the smallest value 2^n-1 that is >= foo.
47 static void calc_pg_masks(struct ceph_pg_pool_info
*pi
)
49 pi
->pg_num_mask
= (1 << calc_bits_of(pi
->pg_num
-1)) - 1;
50 pi
->pgp_num_mask
= (1 << calc_bits_of(pi
->pgp_num
-1)) - 1;
56 static int crush_decode_uniform_bucket(void **p
, void *end
,
57 struct crush_bucket_uniform
*b
)
59 dout("crush_decode_uniform_bucket %p to %p\n", *p
, end
);
60 ceph_decode_need(p
, end
, (1+b
->h
.size
) * sizeof(u32
), bad
);
61 b
->item_weight
= ceph_decode_32(p
);
67 static int crush_decode_list_bucket(void **p
, void *end
,
68 struct crush_bucket_list
*b
)
71 dout("crush_decode_list_bucket %p to %p\n", *p
, end
);
72 b
->item_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
73 if (b
->item_weights
== NULL
)
75 b
->sum_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
76 if (b
->sum_weights
== NULL
)
78 ceph_decode_need(p
, end
, 2 * b
->h
.size
* sizeof(u32
), bad
);
79 for (j
= 0; j
< b
->h
.size
; j
++) {
80 b
->item_weights
[j
] = ceph_decode_32(p
);
81 b
->sum_weights
[j
] = ceph_decode_32(p
);
88 static int crush_decode_tree_bucket(void **p
, void *end
,
89 struct crush_bucket_tree
*b
)
92 dout("crush_decode_tree_bucket %p to %p\n", *p
, end
);
93 ceph_decode_8_safe(p
, end
, b
->num_nodes
, bad
);
94 b
->node_weights
= kcalloc(b
->num_nodes
, sizeof(u32
), GFP_NOFS
);
95 if (b
->node_weights
== NULL
)
97 ceph_decode_need(p
, end
, b
->num_nodes
* sizeof(u32
), bad
);
98 for (j
= 0; j
< b
->num_nodes
; j
++)
99 b
->node_weights
[j
] = ceph_decode_32(p
);
105 static int crush_decode_straw_bucket(void **p
, void *end
,
106 struct crush_bucket_straw
*b
)
109 dout("crush_decode_straw_bucket %p to %p\n", *p
, end
);
110 b
->item_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
111 if (b
->item_weights
== NULL
)
113 b
->straws
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
114 if (b
->straws
== NULL
)
116 ceph_decode_need(p
, end
, 2 * b
->h
.size
* sizeof(u32
), bad
);
117 for (j
= 0; j
< b
->h
.size
; j
++) {
118 b
->item_weights
[j
] = ceph_decode_32(p
);
119 b
->straws
[j
] = ceph_decode_32(p
);
126 static int crush_decode_straw2_bucket(void **p
, void *end
,
127 struct crush_bucket_straw2
*b
)
130 dout("crush_decode_straw2_bucket %p to %p\n", *p
, end
);
131 b
->item_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
132 if (b
->item_weights
== NULL
)
134 ceph_decode_need(p
, end
, b
->h
.size
* sizeof(u32
), bad
);
135 for (j
= 0; j
< b
->h
.size
; j
++)
136 b
->item_weights
[j
] = ceph_decode_32(p
);
142 static struct crush_choose_arg_map
*alloc_choose_arg_map(void)
144 struct crush_choose_arg_map
*arg_map
;
146 arg_map
= kzalloc(sizeof(*arg_map
), GFP_NOIO
);
150 RB_CLEAR_NODE(&arg_map
->node
);
154 static void free_choose_arg_map(struct crush_choose_arg_map
*arg_map
)
159 WARN_ON(!RB_EMPTY_NODE(&arg_map
->node
));
161 for (i
= 0; i
< arg_map
->size
; i
++) {
162 struct crush_choose_arg
*arg
= &arg_map
->args
[i
];
164 for (j
= 0; j
< arg
->weight_set_size
; j
++)
165 kfree(arg
->weight_set
[j
].weights
);
166 kfree(arg
->weight_set
);
169 kfree(arg_map
->args
);
174 DEFINE_RB_FUNCS(choose_arg_map
, struct crush_choose_arg_map
, choose_args_index
,
177 void clear_choose_args(struct crush_map
*c
)
179 while (!RB_EMPTY_ROOT(&c
->choose_args
)) {
180 struct crush_choose_arg_map
*arg_map
=
181 rb_entry(rb_first(&c
->choose_args
),
182 struct crush_choose_arg_map
, node
);
184 erase_choose_arg_map(&c
->choose_args
, arg_map
);
185 free_choose_arg_map(arg_map
);
189 static u32
*decode_array_32_alloc(void **p
, void *end
, u32
*plen
)
195 ceph_decode_32_safe(p
, end
, len
, e_inval
);
199 a
= kmalloc_array(len
, sizeof(u32
), GFP_NOIO
);
205 ceph_decode_need(p
, end
, len
* sizeof(u32
), e_inval
);
206 for (i
= 0; i
< len
; i
++)
207 a
[i
] = ceph_decode_32(p
);
221 * Assumes @arg is zero-initialized.
223 static int decode_choose_arg(void **p
, void *end
, struct crush_choose_arg
*arg
)
227 ceph_decode_32_safe(p
, end
, arg
->weight_set_size
, e_inval
);
228 if (arg
->weight_set_size
) {
231 arg
->weight_set
= kmalloc_array(arg
->weight_set_size
,
232 sizeof(*arg
->weight_set
),
234 if (!arg
->weight_set
)
237 for (i
= 0; i
< arg
->weight_set_size
; i
++) {
238 struct crush_weight_set
*w
= &arg
->weight_set
[i
];
240 w
->weights
= decode_array_32_alloc(p
, end
, &w
->size
);
241 if (IS_ERR(w
->weights
)) {
242 ret
= PTR_ERR(w
->weights
);
249 arg
->ids
= decode_array_32_alloc(p
, end
, &arg
->ids_size
);
250 if (IS_ERR(arg
->ids
)) {
251 ret
= PTR_ERR(arg
->ids
);
262 static int decode_choose_args(void **p
, void *end
, struct crush_map
*c
)
264 struct crush_choose_arg_map
*arg_map
= NULL
;
265 u32 num_choose_arg_maps
, num_buckets
;
268 ceph_decode_32_safe(p
, end
, num_choose_arg_maps
, e_inval
);
269 while (num_choose_arg_maps
--) {
270 arg_map
= alloc_choose_arg_map();
276 ceph_decode_64_safe(p
, end
, arg_map
->choose_args_index
,
278 arg_map
->size
= c
->max_buckets
;
279 arg_map
->args
= kcalloc(arg_map
->size
, sizeof(*arg_map
->args
),
281 if (!arg_map
->args
) {
286 ceph_decode_32_safe(p
, end
, num_buckets
, e_inval
);
287 while (num_buckets
--) {
288 struct crush_choose_arg
*arg
;
291 ceph_decode_32_safe(p
, end
, bucket_index
, e_inval
);
292 if (bucket_index
>= arg_map
->size
)
295 arg
= &arg_map
->args
[bucket_index
];
296 ret
= decode_choose_arg(p
, end
, arg
);
301 arg
->ids_size
!= c
->buckets
[bucket_index
]->size
)
305 insert_choose_arg_map(&c
->choose_args
, arg_map
);
313 free_choose_arg_map(arg_map
);
317 static void crush_finalize(struct crush_map
*c
)
321 /* Space for the array of pointers to per-bucket workspace */
322 c
->working_size
= sizeof(struct crush_work
) +
323 c
->max_buckets
* sizeof(struct crush_work_bucket
*);
325 for (b
= 0; b
< c
->max_buckets
; b
++) {
329 switch (c
->buckets
[b
]->alg
) {
332 * The base case, permutation variables and
333 * the pointer to the permutation array.
335 c
->working_size
+= sizeof(struct crush_work_bucket
);
338 /* Every bucket has a permutation array. */
339 c
->working_size
+= c
->buckets
[b
]->size
* sizeof(__u32
);
343 static struct crush_map
*crush_decode(void *pbyval
, void *end
)
349 void *start
= pbyval
;
352 dout("crush_decode %p to %p len %d\n", *p
, end
, (int)(end
- *p
));
354 c
= kzalloc(sizeof(*c
), GFP_NOFS
);
356 return ERR_PTR(-ENOMEM
);
358 c
->choose_args
= RB_ROOT
;
360 /* set tunables to default values */
361 c
->choose_local_tries
= 2;
362 c
->choose_local_fallback_tries
= 5;
363 c
->choose_total_tries
= 19;
364 c
->chooseleaf_descend_once
= 0;
366 ceph_decode_need(p
, end
, 4*sizeof(u32
), bad
);
367 magic
= ceph_decode_32(p
);
368 if (magic
!= CRUSH_MAGIC
) {
369 pr_err("crush_decode magic %x != current %x\n",
370 (unsigned int)magic
, (unsigned int)CRUSH_MAGIC
);
373 c
->max_buckets
= ceph_decode_32(p
);
374 c
->max_rules
= ceph_decode_32(p
);
375 c
->max_devices
= ceph_decode_32(p
);
377 c
->buckets
= kcalloc(c
->max_buckets
, sizeof(*c
->buckets
), GFP_NOFS
);
378 if (c
->buckets
== NULL
)
380 c
->rules
= kcalloc(c
->max_rules
, sizeof(*c
->rules
), GFP_NOFS
);
381 if (c
->rules
== NULL
)
385 for (i
= 0; i
< c
->max_buckets
; i
++) {
388 struct crush_bucket
*b
;
390 ceph_decode_32_safe(p
, end
, alg
, bad
);
392 c
->buckets
[i
] = NULL
;
395 dout("crush_decode bucket %d off %x %p to %p\n",
396 i
, (int)(*p
-start
), *p
, end
);
399 case CRUSH_BUCKET_UNIFORM
:
400 size
= sizeof(struct crush_bucket_uniform
);
402 case CRUSH_BUCKET_LIST
:
403 size
= sizeof(struct crush_bucket_list
);
405 case CRUSH_BUCKET_TREE
:
406 size
= sizeof(struct crush_bucket_tree
);
408 case CRUSH_BUCKET_STRAW
:
409 size
= sizeof(struct crush_bucket_straw
);
411 case CRUSH_BUCKET_STRAW2
:
412 size
= sizeof(struct crush_bucket_straw2
);
418 b
= c
->buckets
[i
] = kzalloc(size
, GFP_NOFS
);
422 ceph_decode_need(p
, end
, 4*sizeof(u32
), bad
);
423 b
->id
= ceph_decode_32(p
);
424 b
->type
= ceph_decode_16(p
);
425 b
->alg
= ceph_decode_8(p
);
426 b
->hash
= ceph_decode_8(p
);
427 b
->weight
= ceph_decode_32(p
);
428 b
->size
= ceph_decode_32(p
);
430 dout("crush_decode bucket size %d off %x %p to %p\n",
431 b
->size
, (int)(*p
-start
), *p
, end
);
433 b
->items
= kcalloc(b
->size
, sizeof(__s32
), GFP_NOFS
);
434 if (b
->items
== NULL
)
437 ceph_decode_need(p
, end
, b
->size
*sizeof(u32
), bad
);
438 for (j
= 0; j
< b
->size
; j
++)
439 b
->items
[j
] = ceph_decode_32(p
);
442 case CRUSH_BUCKET_UNIFORM
:
443 err
= crush_decode_uniform_bucket(p
, end
,
444 (struct crush_bucket_uniform
*)b
);
448 case CRUSH_BUCKET_LIST
:
449 err
= crush_decode_list_bucket(p
, end
,
450 (struct crush_bucket_list
*)b
);
454 case CRUSH_BUCKET_TREE
:
455 err
= crush_decode_tree_bucket(p
, end
,
456 (struct crush_bucket_tree
*)b
);
460 case CRUSH_BUCKET_STRAW
:
461 err
= crush_decode_straw_bucket(p
, end
,
462 (struct crush_bucket_straw
*)b
);
466 case CRUSH_BUCKET_STRAW2
:
467 err
= crush_decode_straw2_bucket(p
, end
,
468 (struct crush_bucket_straw2
*)b
);
476 dout("rule vec is %p\n", c
->rules
);
477 for (i
= 0; i
< c
->max_rules
; i
++) {
479 struct crush_rule
*r
;
481 ceph_decode_32_safe(p
, end
, yes
, bad
);
483 dout("crush_decode NO rule %d off %x %p to %p\n",
484 i
, (int)(*p
-start
), *p
, end
);
489 dout("crush_decode rule %d off %x %p to %p\n",
490 i
, (int)(*p
-start
), *p
, end
);
493 ceph_decode_32_safe(p
, end
, yes
, bad
);
494 #if BITS_PER_LONG == 32
495 if (yes
> (ULONG_MAX
- sizeof(*r
))
496 / sizeof(struct crush_rule_step
))
499 r
= c
->rules
[i
] = kmalloc(sizeof(*r
) +
500 yes
*sizeof(struct crush_rule_step
),
504 dout(" rule %d is at %p\n", i
, r
);
506 ceph_decode_copy_safe(p
, end
, &r
->mask
, 4, bad
); /* 4 u8's */
507 ceph_decode_need(p
, end
, r
->len
*3*sizeof(u32
), bad
);
508 for (j
= 0; j
< r
->len
; j
++) {
509 r
->steps
[j
].op
= ceph_decode_32(p
);
510 r
->steps
[j
].arg1
= ceph_decode_32(p
);
511 r
->steps
[j
].arg2
= ceph_decode_32(p
);
515 ceph_decode_skip_map(p
, end
, 32, string
, bad
); /* type_map */
516 ceph_decode_skip_map(p
, end
, 32, string
, bad
); /* name_map */
517 ceph_decode_skip_map(p
, end
, 32, string
, bad
); /* rule_name_map */
520 ceph_decode_need(p
, end
, 3*sizeof(u32
), done
);
521 c
->choose_local_tries
= ceph_decode_32(p
);
522 c
->choose_local_fallback_tries
= ceph_decode_32(p
);
523 c
->choose_total_tries
= ceph_decode_32(p
);
524 dout("crush decode tunable choose_local_tries = %d\n",
525 c
->choose_local_tries
);
526 dout("crush decode tunable choose_local_fallback_tries = %d\n",
527 c
->choose_local_fallback_tries
);
528 dout("crush decode tunable choose_total_tries = %d\n",
529 c
->choose_total_tries
);
531 ceph_decode_need(p
, end
, sizeof(u32
), done
);
532 c
->chooseleaf_descend_once
= ceph_decode_32(p
);
533 dout("crush decode tunable chooseleaf_descend_once = %d\n",
534 c
->chooseleaf_descend_once
);
536 ceph_decode_need(p
, end
, sizeof(u8
), done
);
537 c
->chooseleaf_vary_r
= ceph_decode_8(p
);
538 dout("crush decode tunable chooseleaf_vary_r = %d\n",
539 c
->chooseleaf_vary_r
);
541 /* skip straw_calc_version, allowed_bucket_algs */
542 ceph_decode_need(p
, end
, sizeof(u8
) + sizeof(u32
), done
);
543 *p
+= sizeof(u8
) + sizeof(u32
);
545 ceph_decode_need(p
, end
, sizeof(u8
), done
);
546 c
->chooseleaf_stable
= ceph_decode_8(p
);
547 dout("crush decode tunable chooseleaf_stable = %d\n",
548 c
->chooseleaf_stable
);
552 ceph_decode_skip_map(p
, end
, 32, 32, bad
);
554 ceph_decode_skip_map(p
, end
, 32, string
, bad
);
556 ceph_decode_skip_map_of_map(p
, end
, 32, 32, 32, bad
);
560 err
= decode_choose_args(p
, end
, c
);
567 dout("crush_decode success\n");
573 dout("crush_decode fail %d\n", err
);
582 int ceph_pg_compare(const struct ceph_pg
*lhs
, const struct ceph_pg
*rhs
)
584 if (lhs
->pool
< rhs
->pool
)
586 if (lhs
->pool
> rhs
->pool
)
588 if (lhs
->seed
< rhs
->seed
)
590 if (lhs
->seed
> rhs
->seed
)
596 int ceph_spg_compare(const struct ceph_spg
*lhs
, const struct ceph_spg
*rhs
)
600 ret
= ceph_pg_compare(&lhs
->pgid
, &rhs
->pgid
);
604 if (lhs
->shard
< rhs
->shard
)
606 if (lhs
->shard
> rhs
->shard
)
612 static struct ceph_pg_mapping
*alloc_pg_mapping(size_t payload_len
)
614 struct ceph_pg_mapping
*pg
;
616 pg
= kmalloc(sizeof(*pg
) + payload_len
, GFP_NOIO
);
620 RB_CLEAR_NODE(&pg
->node
);
624 static void free_pg_mapping(struct ceph_pg_mapping
*pg
)
626 WARN_ON(!RB_EMPTY_NODE(&pg
->node
));
632 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
633 * to a set of osds) and primary_temp (explicit primary setting)
635 DEFINE_RB_FUNCS2(pg_mapping
, struct ceph_pg_mapping
, pgid
, ceph_pg_compare
,
636 RB_BYPTR
, const struct ceph_pg
*, node
)
639 * rbtree of pg pool info
641 static int __insert_pg_pool(struct rb_root
*root
, struct ceph_pg_pool_info
*new)
643 struct rb_node
**p
= &root
->rb_node
;
644 struct rb_node
*parent
= NULL
;
645 struct ceph_pg_pool_info
*pi
= NULL
;
649 pi
= rb_entry(parent
, struct ceph_pg_pool_info
, node
);
650 if (new->id
< pi
->id
)
652 else if (new->id
> pi
->id
)
658 rb_link_node(&new->node
, parent
, p
);
659 rb_insert_color(&new->node
, root
);
663 static struct ceph_pg_pool_info
*__lookup_pg_pool(struct rb_root
*root
, u64 id
)
665 struct ceph_pg_pool_info
*pi
;
666 struct rb_node
*n
= root
->rb_node
;
669 pi
= rb_entry(n
, struct ceph_pg_pool_info
, node
);
672 else if (id
> pi
->id
)
680 struct ceph_pg_pool_info
*ceph_pg_pool_by_id(struct ceph_osdmap
*map
, u64 id
)
682 return __lookup_pg_pool(&map
->pg_pools
, id
);
685 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap
*map
, u64 id
)
687 struct ceph_pg_pool_info
*pi
;
689 if (id
== CEPH_NOPOOL
)
692 if (WARN_ON_ONCE(id
> (u64
) INT_MAX
))
695 pi
= __lookup_pg_pool(&map
->pg_pools
, (int) id
);
697 return pi
? pi
->name
: NULL
;
699 EXPORT_SYMBOL(ceph_pg_pool_name_by_id
);
701 int ceph_pg_poolid_by_name(struct ceph_osdmap
*map
, const char *name
)
705 for (rbp
= rb_first(&map
->pg_pools
); rbp
; rbp
= rb_next(rbp
)) {
706 struct ceph_pg_pool_info
*pi
=
707 rb_entry(rbp
, struct ceph_pg_pool_info
, node
);
708 if (pi
->name
&& strcmp(pi
->name
, name
) == 0)
713 EXPORT_SYMBOL(ceph_pg_poolid_by_name
);
715 static void __remove_pg_pool(struct rb_root
*root
, struct ceph_pg_pool_info
*pi
)
717 rb_erase(&pi
->node
, root
);
722 static int decode_pool(void **p
, void *end
, struct ceph_pg_pool_info
*pi
)
728 ceph_decode_need(p
, end
, 2 + 4, bad
);
729 ev
= ceph_decode_8(p
); /* encoding version */
730 cv
= ceph_decode_8(p
); /* compat version */
732 pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev
, cv
);
736 pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev
, cv
);
739 len
= ceph_decode_32(p
);
740 ceph_decode_need(p
, end
, len
, bad
);
743 pi
->type
= ceph_decode_8(p
);
744 pi
->size
= ceph_decode_8(p
);
745 pi
->crush_ruleset
= ceph_decode_8(p
);
746 pi
->object_hash
= ceph_decode_8(p
);
748 pi
->pg_num
= ceph_decode_32(p
);
749 pi
->pgp_num
= ceph_decode_32(p
);
751 *p
+= 4 + 4; /* skip lpg* */
752 *p
+= 4; /* skip last_change */
753 *p
+= 8 + 4; /* skip snap_seq, snap_epoch */
756 num
= ceph_decode_32(p
);
758 *p
+= 8; /* snapid key */
759 *p
+= 1 + 1; /* versions */
760 len
= ceph_decode_32(p
);
764 /* skip removed_snaps */
765 num
= ceph_decode_32(p
);
768 *p
+= 8; /* skip auid */
769 pi
->flags
= ceph_decode_64(p
);
770 *p
+= 4; /* skip crash_replay_interval */
773 pi
->min_size
= ceph_decode_8(p
);
775 pi
->min_size
= pi
->size
- pi
->size
/ 2;
778 *p
+= 8 + 8; /* skip quota_max_* */
782 num
= ceph_decode_32(p
);
785 *p
+= 8; /* skip tier_of */
786 *p
+= 1; /* skip cache_mode */
788 pi
->read_tier
= ceph_decode_64(p
);
789 pi
->write_tier
= ceph_decode_64(p
);
796 /* skip properties */
797 num
= ceph_decode_32(p
);
799 len
= ceph_decode_32(p
);
801 len
= ceph_decode_32(p
);
807 /* skip hit_set_params */
808 *p
+= 1 + 1; /* versions */
809 len
= ceph_decode_32(p
);
812 *p
+= 4; /* skip hit_set_period */
813 *p
+= 4; /* skip hit_set_count */
817 *p
+= 4; /* skip stripe_width */
820 *p
+= 8; /* skip target_max_bytes */
821 *p
+= 8; /* skip target_max_objects */
822 *p
+= 4; /* skip cache_target_dirty_ratio_micro */
823 *p
+= 4; /* skip cache_target_full_ratio_micro */
824 *p
+= 4; /* skip cache_min_flush_age */
825 *p
+= 4; /* skip cache_min_evict_age */
829 /* skip erasure_code_profile */
830 len
= ceph_decode_32(p
);
835 * last_force_op_resend_preluminous, will be overridden if the
836 * map was encoded with RESEND_ON_SPLIT
839 pi
->last_force_request_resend
= ceph_decode_32(p
);
841 pi
->last_force_request_resend
= 0;
844 *p
+= 4; /* skip min_read_recency_for_promote */
847 *p
+= 8; /* skip expected_num_objects */
850 *p
+= 4; /* skip cache_target_dirty_high_ratio_micro */
853 *p
+= 4; /* skip min_write_recency_for_promote */
856 *p
+= 1; /* skip use_gmt_hitset */
859 *p
+= 1; /* skip fast_read */
862 *p
+= 4; /* skip hit_set_grade_decay_rate */
863 *p
+= 4; /* skip hit_set_search_last_n */
868 *p
+= 1 + 1; /* versions */
869 len
= ceph_decode_32(p
);
874 pi
->last_force_request_resend
= ceph_decode_32(p
);
876 /* ignore the rest */
886 static int decode_pool_names(void **p
, void *end
, struct ceph_osdmap
*map
)
888 struct ceph_pg_pool_info
*pi
;
892 ceph_decode_32_safe(p
, end
, num
, bad
);
893 dout(" %d pool names\n", num
);
895 ceph_decode_64_safe(p
, end
, pool
, bad
);
896 ceph_decode_32_safe(p
, end
, len
, bad
);
897 dout(" pool %llu len %d\n", pool
, len
);
898 ceph_decode_need(p
, end
, len
, bad
);
899 pi
= __lookup_pg_pool(&map
->pg_pools
, pool
);
901 char *name
= kstrndup(*p
, len
, GFP_NOFS
);
907 dout(" name is %s\n", pi
->name
);
920 struct ceph_osdmap
*ceph_osdmap_alloc(void)
922 struct ceph_osdmap
*map
;
924 map
= kzalloc(sizeof(*map
), GFP_NOIO
);
928 map
->pg_pools
= RB_ROOT
;
930 map
->pg_temp
= RB_ROOT
;
931 map
->primary_temp
= RB_ROOT
;
932 map
->pg_upmap
= RB_ROOT
;
933 map
->pg_upmap_items
= RB_ROOT
;
934 mutex_init(&map
->crush_workspace_mutex
);
939 void ceph_osdmap_destroy(struct ceph_osdmap
*map
)
941 dout("osdmap_destroy %p\n", map
);
943 crush_destroy(map
->crush
);
944 while (!RB_EMPTY_ROOT(&map
->pg_temp
)) {
945 struct ceph_pg_mapping
*pg
=
946 rb_entry(rb_first(&map
->pg_temp
),
947 struct ceph_pg_mapping
, node
);
948 erase_pg_mapping(&map
->pg_temp
, pg
);
951 while (!RB_EMPTY_ROOT(&map
->primary_temp
)) {
952 struct ceph_pg_mapping
*pg
=
953 rb_entry(rb_first(&map
->primary_temp
),
954 struct ceph_pg_mapping
, node
);
955 erase_pg_mapping(&map
->primary_temp
, pg
);
958 while (!RB_EMPTY_ROOT(&map
->pg_upmap
)) {
959 struct ceph_pg_mapping
*pg
=
960 rb_entry(rb_first(&map
->pg_upmap
),
961 struct ceph_pg_mapping
, node
);
962 rb_erase(&pg
->node
, &map
->pg_upmap
);
965 while (!RB_EMPTY_ROOT(&map
->pg_upmap_items
)) {
966 struct ceph_pg_mapping
*pg
=
967 rb_entry(rb_first(&map
->pg_upmap_items
),
968 struct ceph_pg_mapping
, node
);
969 rb_erase(&pg
->node
, &map
->pg_upmap_items
);
972 while (!RB_EMPTY_ROOT(&map
->pg_pools
)) {
973 struct ceph_pg_pool_info
*pi
=
974 rb_entry(rb_first(&map
->pg_pools
),
975 struct ceph_pg_pool_info
, node
);
976 __remove_pg_pool(&map
->pg_pools
, pi
);
978 kfree(map
->osd_state
);
979 kfree(map
->osd_weight
);
980 kfree(map
->osd_addr
);
981 kfree(map
->osd_primary_affinity
);
982 kfree(map
->crush_workspace
);
987 * Adjust max_osd value, (re)allocate arrays.
989 * The new elements are properly initialized.
991 static int osdmap_set_max_osd(struct ceph_osdmap
*map
, int max
)
995 struct ceph_entity_addr
*addr
;
998 state
= krealloc(map
->osd_state
, max
*sizeof(*state
), GFP_NOFS
);
1001 map
->osd_state
= state
;
1003 weight
= krealloc(map
->osd_weight
, max
*sizeof(*weight
), GFP_NOFS
);
1006 map
->osd_weight
= weight
;
1008 addr
= krealloc(map
->osd_addr
, max
*sizeof(*addr
), GFP_NOFS
);
1011 map
->osd_addr
= addr
;
1013 for (i
= map
->max_osd
; i
< max
; i
++) {
1014 map
->osd_state
[i
] = 0;
1015 map
->osd_weight
[i
] = CEPH_OSD_OUT
;
1016 memset(map
->osd_addr
+ i
, 0, sizeof(*map
->osd_addr
));
1019 if (map
->osd_primary_affinity
) {
1022 affinity
= krealloc(map
->osd_primary_affinity
,
1023 max
*sizeof(*affinity
), GFP_NOFS
);
1026 map
->osd_primary_affinity
= affinity
;
1028 for (i
= map
->max_osd
; i
< max
; i
++)
1029 map
->osd_primary_affinity
[i
] =
1030 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
;
1038 static int osdmap_set_crush(struct ceph_osdmap
*map
, struct crush_map
*crush
)
1044 return PTR_ERR(crush
);
1046 work_size
= crush_work_size(crush
, CEPH_PG_MAX_SIZE
);
1047 dout("%s work_size %zu bytes\n", __func__
, work_size
);
1048 workspace
= kmalloc(work_size
, GFP_NOIO
);
1050 crush_destroy(crush
);
1053 crush_init_workspace(crush
, workspace
);
1056 crush_destroy(map
->crush
);
1057 kfree(map
->crush_workspace
);
1059 map
->crush_workspace
= workspace
;
1063 #define OSDMAP_WRAPPER_COMPAT_VER 7
1064 #define OSDMAP_CLIENT_DATA_COMPAT_VER 1
1067 * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps,
1068 * to struct_v of the client_data section for new (v7 and above)
1071 static int get_osdmap_client_data_v(void **p
, void *end
,
1072 const char *prefix
, u8
*v
)
1076 ceph_decode_8_safe(p
, end
, struct_v
, e_inval
);
1077 if (struct_v
>= 7) {
1080 ceph_decode_8_safe(p
, end
, struct_compat
, e_inval
);
1081 if (struct_compat
> OSDMAP_WRAPPER_COMPAT_VER
) {
1082 pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
1083 struct_v
, struct_compat
,
1084 OSDMAP_WRAPPER_COMPAT_VER
, prefix
);
1087 *p
+= 4; /* ignore wrapper struct_len */
1089 ceph_decode_8_safe(p
, end
, struct_v
, e_inval
);
1090 ceph_decode_8_safe(p
, end
, struct_compat
, e_inval
);
1091 if (struct_compat
> OSDMAP_CLIENT_DATA_COMPAT_VER
) {
1092 pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
1093 struct_v
, struct_compat
,
1094 OSDMAP_CLIENT_DATA_COMPAT_VER
, prefix
);
1097 *p
+= 4; /* ignore client data struct_len */
1102 ceph_decode_16_safe(p
, end
, version
, e_inval
);
1104 pr_warn("got v %d < 6 of %s ceph_osdmap\n",
1109 /* old osdmap enconding */
1120 static int __decode_pools(void **p
, void *end
, struct ceph_osdmap
*map
,
1125 ceph_decode_32_safe(p
, end
, n
, e_inval
);
1127 struct ceph_pg_pool_info
*pi
;
1131 ceph_decode_64_safe(p
, end
, pool
, e_inval
);
1133 pi
= __lookup_pg_pool(&map
->pg_pools
, pool
);
1134 if (!incremental
|| !pi
) {
1135 pi
= kzalloc(sizeof(*pi
), GFP_NOFS
);
1141 ret
= __insert_pg_pool(&map
->pg_pools
, pi
);
1148 ret
= decode_pool(p
, end
, pi
);
1159 static int decode_pools(void **p
, void *end
, struct ceph_osdmap
*map
)
1161 return __decode_pools(p
, end
, map
, false);
1164 static int decode_new_pools(void **p
, void *end
, struct ceph_osdmap
*map
)
1166 return __decode_pools(p
, end
, map
, true);
1169 typedef struct ceph_pg_mapping
*(*decode_mapping_fn_t
)(void **, void *, bool);
1171 static int decode_pg_mapping(void **p
, void *end
, struct rb_root
*mapping_root
,
1172 decode_mapping_fn_t fn
, bool incremental
)
1176 WARN_ON(!incremental
&& !fn
);
1178 ceph_decode_32_safe(p
, end
, n
, e_inval
);
1180 struct ceph_pg_mapping
*pg
;
1181 struct ceph_pg pgid
;
1184 ret
= ceph_decode_pgid(p
, end
, &pgid
);
1188 pg
= lookup_pg_mapping(mapping_root
, &pgid
);
1190 WARN_ON(!incremental
);
1191 erase_pg_mapping(mapping_root
, pg
);
1192 free_pg_mapping(pg
);
1196 pg
= fn(p
, end
, incremental
);
1201 pg
->pgid
= pgid
; /* struct */
1202 insert_pg_mapping(mapping_root
, pg
);
1213 static struct ceph_pg_mapping
*__decode_pg_temp(void **p
, void *end
,
1216 struct ceph_pg_mapping
*pg
;
1219 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1220 if (len
== 0 && incremental
)
1221 return NULL
; /* new_pg_temp: [] to remove */
1222 if (len
> (SIZE_MAX
- sizeof(*pg
)) / sizeof(u32
))
1223 return ERR_PTR(-EINVAL
);
1225 ceph_decode_need(p
, end
, len
* sizeof(u32
), e_inval
);
1226 pg
= alloc_pg_mapping(len
* sizeof(u32
));
1228 return ERR_PTR(-ENOMEM
);
1230 pg
->pg_temp
.len
= len
;
1231 for (i
= 0; i
< len
; i
++)
1232 pg
->pg_temp
.osds
[i
] = ceph_decode_32(p
);
1237 return ERR_PTR(-EINVAL
);
1240 static int decode_pg_temp(void **p
, void *end
, struct ceph_osdmap
*map
)
1242 return decode_pg_mapping(p
, end
, &map
->pg_temp
, __decode_pg_temp
,
1246 static int decode_new_pg_temp(void **p
, void *end
, struct ceph_osdmap
*map
)
1248 return decode_pg_mapping(p
, end
, &map
->pg_temp
, __decode_pg_temp
,
1252 static struct ceph_pg_mapping
*__decode_primary_temp(void **p
, void *end
,
1255 struct ceph_pg_mapping
*pg
;
1258 ceph_decode_32_safe(p
, end
, osd
, e_inval
);
1259 if (osd
== (u32
)-1 && incremental
)
1260 return NULL
; /* new_primary_temp: -1 to remove */
1262 pg
= alloc_pg_mapping(0);
1264 return ERR_PTR(-ENOMEM
);
1266 pg
->primary_temp
.osd
= osd
;
1270 return ERR_PTR(-EINVAL
);
1273 static int decode_primary_temp(void **p
, void *end
, struct ceph_osdmap
*map
)
1275 return decode_pg_mapping(p
, end
, &map
->primary_temp
,
1276 __decode_primary_temp
, false);
1279 static int decode_new_primary_temp(void **p
, void *end
,
1280 struct ceph_osdmap
*map
)
1282 return decode_pg_mapping(p
, end
, &map
->primary_temp
,
1283 __decode_primary_temp
, true);
1286 u32
ceph_get_primary_affinity(struct ceph_osdmap
*map
, int osd
)
1288 BUG_ON(osd
>= map
->max_osd
);
1290 if (!map
->osd_primary_affinity
)
1291 return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
;
1293 return map
->osd_primary_affinity
[osd
];
1296 static int set_primary_affinity(struct ceph_osdmap
*map
, int osd
, u32 aff
)
1298 BUG_ON(osd
>= map
->max_osd
);
1300 if (!map
->osd_primary_affinity
) {
1303 map
->osd_primary_affinity
= kmalloc(map
->max_osd
*sizeof(u32
),
1305 if (!map
->osd_primary_affinity
)
1308 for (i
= 0; i
< map
->max_osd
; i
++)
1309 map
->osd_primary_affinity
[i
] =
1310 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
;
1313 map
->osd_primary_affinity
[osd
] = aff
;
1318 static int decode_primary_affinity(void **p
, void *end
,
1319 struct ceph_osdmap
*map
)
1323 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1325 kfree(map
->osd_primary_affinity
);
1326 map
->osd_primary_affinity
= NULL
;
1329 if (len
!= map
->max_osd
)
1332 ceph_decode_need(p
, end
, map
->max_osd
*sizeof(u32
), e_inval
);
1334 for (i
= 0; i
< map
->max_osd
; i
++) {
1337 ret
= set_primary_affinity(map
, i
, ceph_decode_32(p
));
1348 static int decode_new_primary_affinity(void **p
, void *end
,
1349 struct ceph_osdmap
*map
)
1353 ceph_decode_32_safe(p
, end
, n
, e_inval
);
1358 ceph_decode_32_safe(p
, end
, osd
, e_inval
);
1359 ceph_decode_32_safe(p
, end
, aff
, e_inval
);
1361 ret
= set_primary_affinity(map
, osd
, aff
);
1365 pr_info("osd%d primary-affinity 0x%x\n", osd
, aff
);
1374 static struct ceph_pg_mapping
*__decode_pg_upmap(void **p
, void *end
,
1377 return __decode_pg_temp(p
, end
, false);
1380 static int decode_pg_upmap(void **p
, void *end
, struct ceph_osdmap
*map
)
1382 return decode_pg_mapping(p
, end
, &map
->pg_upmap
, __decode_pg_upmap
,
1386 static int decode_new_pg_upmap(void **p
, void *end
, struct ceph_osdmap
*map
)
1388 return decode_pg_mapping(p
, end
, &map
->pg_upmap
, __decode_pg_upmap
,
1392 static int decode_old_pg_upmap(void **p
, void *end
, struct ceph_osdmap
*map
)
1394 return decode_pg_mapping(p
, end
, &map
->pg_upmap
, NULL
, true);
1397 static struct ceph_pg_mapping
*__decode_pg_upmap_items(void **p
, void *end
,
1400 struct ceph_pg_mapping
*pg
;
1403 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1404 if (len
> (SIZE_MAX
- sizeof(*pg
)) / (2 * sizeof(u32
)))
1405 return ERR_PTR(-EINVAL
);
1407 ceph_decode_need(p
, end
, 2 * len
* sizeof(u32
), e_inval
);
1408 pg
= alloc_pg_mapping(2 * len
* sizeof(u32
));
1410 return ERR_PTR(-ENOMEM
);
1412 pg
->pg_upmap_items
.len
= len
;
1413 for (i
= 0; i
< len
; i
++) {
1414 pg
->pg_upmap_items
.from_to
[i
][0] = ceph_decode_32(p
);
1415 pg
->pg_upmap_items
.from_to
[i
][1] = ceph_decode_32(p
);
1421 return ERR_PTR(-EINVAL
);
1424 static int decode_pg_upmap_items(void **p
, void *end
, struct ceph_osdmap
*map
)
1426 return decode_pg_mapping(p
, end
, &map
->pg_upmap_items
,
1427 __decode_pg_upmap_items
, false);
1430 static int decode_new_pg_upmap_items(void **p
, void *end
,
1431 struct ceph_osdmap
*map
)
1433 return decode_pg_mapping(p
, end
, &map
->pg_upmap_items
,
1434 __decode_pg_upmap_items
, true);
1437 static int decode_old_pg_upmap_items(void **p
, void *end
,
1438 struct ceph_osdmap
*map
)
1440 return decode_pg_mapping(p
, end
, &map
->pg_upmap_items
, NULL
, true);
1444 * decode a full map.
1446 static int osdmap_decode(void **p
, void *end
, struct ceph_osdmap
*map
)
1455 dout("%s %p to %p len %d\n", __func__
, *p
, end
, (int)(end
- *p
));
1457 err
= get_osdmap_client_data_v(p
, end
, "full", &struct_v
);
1461 /* fsid, epoch, created, modified */
1462 ceph_decode_need(p
, end
, sizeof(map
->fsid
) + sizeof(u32
) +
1463 sizeof(map
->created
) + sizeof(map
->modified
), e_inval
);
1464 ceph_decode_copy(p
, &map
->fsid
, sizeof(map
->fsid
));
1465 epoch
= map
->epoch
= ceph_decode_32(p
);
1466 ceph_decode_copy(p
, &map
->created
, sizeof(map
->created
));
1467 ceph_decode_copy(p
, &map
->modified
, sizeof(map
->modified
));
1470 err
= decode_pools(p
, end
, map
);
1475 err
= decode_pool_names(p
, end
, map
);
1479 ceph_decode_32_safe(p
, end
, map
->pool_max
, e_inval
);
1481 ceph_decode_32_safe(p
, end
, map
->flags
, e_inval
);
1484 ceph_decode_32_safe(p
, end
, max
, e_inval
);
1486 /* (re)alloc osd arrays */
1487 err
= osdmap_set_max_osd(map
, max
);
1491 /* osd_state, osd_weight, osd_addrs->client_addr */
1492 ceph_decode_need(p
, end
, 3*sizeof(u32
) +
1493 map
->max_osd
*((struct_v
>= 5 ? sizeof(u32
) :
1495 sizeof(*map
->osd_weight
) +
1496 sizeof(*map
->osd_addr
)), e_inval
);
1498 if (ceph_decode_32(p
) != map
->max_osd
)
1501 if (struct_v
>= 5) {
1502 for (i
= 0; i
< map
->max_osd
; i
++)
1503 map
->osd_state
[i
] = ceph_decode_32(p
);
1505 for (i
= 0; i
< map
->max_osd
; i
++)
1506 map
->osd_state
[i
] = ceph_decode_8(p
);
1509 if (ceph_decode_32(p
) != map
->max_osd
)
1512 for (i
= 0; i
< map
->max_osd
; i
++)
1513 map
->osd_weight
[i
] = ceph_decode_32(p
);
1515 if (ceph_decode_32(p
) != map
->max_osd
)
1518 ceph_decode_copy(p
, map
->osd_addr
, map
->max_osd
*sizeof(*map
->osd_addr
));
1519 for (i
= 0; i
< map
->max_osd
; i
++)
1520 ceph_decode_addr(&map
->osd_addr
[i
]);
1523 err
= decode_pg_temp(p
, end
, map
);
1528 if (struct_v
>= 1) {
1529 err
= decode_primary_temp(p
, end
, map
);
1534 /* primary_affinity */
1535 if (struct_v
>= 2) {
1536 err
= decode_primary_affinity(p
, end
, map
);
1540 WARN_ON(map
->osd_primary_affinity
);
1544 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1545 err
= osdmap_set_crush(map
, crush_decode(*p
, min(*p
+ len
, end
)));
1550 if (struct_v
>= 3) {
1551 /* erasure_code_profiles */
1552 ceph_decode_skip_map_of_map(p
, end
, string
, string
, string
,
1556 if (struct_v
>= 4) {
1557 err
= decode_pg_upmap(p
, end
, map
);
1561 err
= decode_pg_upmap_items(p
, end
, map
);
1565 WARN_ON(!RB_EMPTY_ROOT(&map
->pg_upmap
));
1566 WARN_ON(!RB_EMPTY_ROOT(&map
->pg_upmap_items
));
1569 /* ignore the rest */
1572 dout("full osdmap epoch %d max_osd %d\n", map
->epoch
, map
->max_osd
);
1578 pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1579 err
, epoch
, (int)(*p
- start
), *p
, start
, end
);
1580 print_hex_dump(KERN_DEBUG
, "osdmap: ",
1581 DUMP_PREFIX_OFFSET
, 16, 1,
1582 start
, end
- start
, true);
1587 * Allocate and decode a full map.
1589 struct ceph_osdmap
*ceph_osdmap_decode(void **p
, void *end
)
1591 struct ceph_osdmap
*map
;
1594 map
= ceph_osdmap_alloc();
1596 return ERR_PTR(-ENOMEM
);
1598 ret
= osdmap_decode(p
, end
, map
);
1600 ceph_osdmap_destroy(map
);
1601 return ERR_PTR(ret
);
1608 * Encoding order is (new_up_client, new_state, new_weight). Need to
1609 * apply in the (new_weight, new_state, new_up_client) order, because
1610 * an incremental map may look like e.g.
1612 * new_up_client: { osd=6, addr=... } # set osd_state and addr
1613 * new_state: { osd=6, xorstate=EXISTS } # clear osd_state
1615 static int decode_new_up_state_weight(void **p
, void *end
, u8 struct_v
,
1616 struct ceph_osdmap
*map
)
1618 void *new_up_client
;
1620 void *new_weight_end
;
1624 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1625 len
*= sizeof(u32
) + sizeof(struct ceph_entity_addr
);
1626 ceph_decode_need(p
, end
, len
, e_inval
);
1630 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1631 len
*= sizeof(u32
) + (struct_v
>= 5 ? sizeof(u32
) : sizeof(u8
));
1632 ceph_decode_need(p
, end
, len
, e_inval
);
1636 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1641 ceph_decode_need(p
, end
, 2*sizeof(u32
), e_inval
);
1642 osd
= ceph_decode_32(p
);
1643 w
= ceph_decode_32(p
);
1644 BUG_ON(osd
>= map
->max_osd
);
1645 pr_info("osd%d weight 0x%x %s\n", osd
, w
,
1646 w
== CEPH_OSD_IN
? "(in)" :
1647 (w
== CEPH_OSD_OUT
? "(out)" : ""));
1648 map
->osd_weight
[osd
] = w
;
1651 * If we are marking in, set the EXISTS, and clear the
1652 * AUTOOUT and NEW bits.
1655 map
->osd_state
[osd
] |= CEPH_OSD_EXISTS
;
1656 map
->osd_state
[osd
] &= ~(CEPH_OSD_AUTOOUT
|
1660 new_weight_end
= *p
;
1662 /* new_state (up/down) */
1664 len
= ceph_decode_32(p
);
1670 osd
= ceph_decode_32(p
);
1672 xorstate
= ceph_decode_32(p
);
1674 xorstate
= ceph_decode_8(p
);
1676 xorstate
= CEPH_OSD_UP
;
1677 BUG_ON(osd
>= map
->max_osd
);
1678 if ((map
->osd_state
[osd
] & CEPH_OSD_UP
) &&
1679 (xorstate
& CEPH_OSD_UP
))
1680 pr_info("osd%d down\n", osd
);
1681 if ((map
->osd_state
[osd
] & CEPH_OSD_EXISTS
) &&
1682 (xorstate
& CEPH_OSD_EXISTS
)) {
1683 pr_info("osd%d does not exist\n", osd
);
1684 ret
= set_primary_affinity(map
, osd
,
1685 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
);
1688 memset(map
->osd_addr
+ osd
, 0, sizeof(*map
->osd_addr
));
1689 map
->osd_state
[osd
] = 0;
1691 map
->osd_state
[osd
] ^= xorstate
;
1697 len
= ceph_decode_32(p
);
1700 struct ceph_entity_addr addr
;
1702 osd
= ceph_decode_32(p
);
1703 ceph_decode_copy(p
, &addr
, sizeof(addr
));
1704 ceph_decode_addr(&addr
);
1705 BUG_ON(osd
>= map
->max_osd
);
1706 pr_info("osd%d up\n", osd
);
1707 map
->osd_state
[osd
] |= CEPH_OSD_EXISTS
| CEPH_OSD_UP
;
1708 map
->osd_addr
[osd
] = addr
;
1711 *p
= new_weight_end
;
1719 * decode and apply an incremental map update.
1721 struct ceph_osdmap
*osdmap_apply_incremental(void **p
, void *end
,
1722 struct ceph_osdmap
*map
)
1724 struct ceph_fsid fsid
;
1726 struct ceph_timespec modified
;
1730 __s32 new_flags
, max
;
1735 dout("%s %p to %p len %d\n", __func__
, *p
, end
, (int)(end
- *p
));
1737 err
= get_osdmap_client_data_v(p
, end
, "inc", &struct_v
);
1741 /* fsid, epoch, modified, new_pool_max, new_flags */
1742 ceph_decode_need(p
, end
, sizeof(fsid
) + sizeof(u32
) + sizeof(modified
) +
1743 sizeof(u64
) + sizeof(u32
), e_inval
);
1744 ceph_decode_copy(p
, &fsid
, sizeof(fsid
));
1745 epoch
= ceph_decode_32(p
);
1746 BUG_ON(epoch
!= map
->epoch
+1);
1747 ceph_decode_copy(p
, &modified
, sizeof(modified
));
1748 new_pool_max
= ceph_decode_64(p
);
1749 new_flags
= ceph_decode_32(p
);
1752 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1754 dout("apply_incremental full map len %d, %p to %p\n",
1756 return ceph_osdmap_decode(p
, min(*p
+len
, end
));
1760 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1762 err
= osdmap_set_crush(map
,
1763 crush_decode(*p
, min(*p
+ len
, end
)));
1771 map
->flags
= new_flags
;
1772 if (new_pool_max
>= 0)
1773 map
->pool_max
= new_pool_max
;
1776 ceph_decode_32_safe(p
, end
, max
, e_inval
);
1778 err
= osdmap_set_max_osd(map
, max
);
1784 map
->modified
= modified
;
1787 err
= decode_new_pools(p
, end
, map
);
1791 /* new_pool_names */
1792 err
= decode_pool_names(p
, end
, map
);
1797 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1799 struct ceph_pg_pool_info
*pi
;
1801 ceph_decode_64_safe(p
, end
, pool
, e_inval
);
1802 pi
= __lookup_pg_pool(&map
->pg_pools
, pool
);
1804 __remove_pg_pool(&map
->pg_pools
, pi
);
1807 /* new_up_client, new_state, new_weight */
1808 err
= decode_new_up_state_weight(p
, end
, struct_v
, map
);
1813 err
= decode_new_pg_temp(p
, end
, map
);
1817 /* new_primary_temp */
1818 if (struct_v
>= 1) {
1819 err
= decode_new_primary_temp(p
, end
, map
);
1824 /* new_primary_affinity */
1825 if (struct_v
>= 2) {
1826 err
= decode_new_primary_affinity(p
, end
, map
);
1831 if (struct_v
>= 3) {
1832 /* new_erasure_code_profiles */
1833 ceph_decode_skip_map_of_map(p
, end
, string
, string
, string
,
1835 /* old_erasure_code_profiles */
1836 ceph_decode_skip_set(p
, end
, string
, e_inval
);
1839 if (struct_v
>= 4) {
1840 err
= decode_new_pg_upmap(p
, end
, map
);
1844 err
= decode_old_pg_upmap(p
, end
, map
);
1848 err
= decode_new_pg_upmap_items(p
, end
, map
);
1852 err
= decode_old_pg_upmap_items(p
, end
, map
);
1857 /* ignore the rest */
1860 dout("inc osdmap epoch %d max_osd %d\n", map
->epoch
, map
->max_osd
);
1866 pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1867 err
, epoch
, (int)(*p
- start
), *p
, start
, end
);
1868 print_hex_dump(KERN_DEBUG
, "osdmap: ",
1869 DUMP_PREFIX_OFFSET
, 16, 1,
1870 start
, end
- start
, true);
1871 return ERR_PTR(err
);
1874 void ceph_oloc_copy(struct ceph_object_locator
*dest
,
1875 const struct ceph_object_locator
*src
)
1877 ceph_oloc_destroy(dest
);
1879 dest
->pool
= src
->pool
;
1881 dest
->pool_ns
= ceph_get_string(src
->pool_ns
);
1883 dest
->pool_ns
= NULL
;
1885 EXPORT_SYMBOL(ceph_oloc_copy
);
1887 void ceph_oloc_destroy(struct ceph_object_locator
*oloc
)
1889 ceph_put_string(oloc
->pool_ns
);
1891 EXPORT_SYMBOL(ceph_oloc_destroy
);
1893 void ceph_oid_copy(struct ceph_object_id
*dest
,
1894 const struct ceph_object_id
*src
)
1896 ceph_oid_destroy(dest
);
1898 if (src
->name
!= src
->inline_name
) {
1899 /* very rare, see ceph_object_id definition */
1900 dest
->name
= kmalloc(src
->name_len
+ 1,
1901 GFP_NOIO
| __GFP_NOFAIL
);
1903 dest
->name
= dest
->inline_name
;
1905 memcpy(dest
->name
, src
->name
, src
->name_len
+ 1);
1906 dest
->name_len
= src
->name_len
;
1908 EXPORT_SYMBOL(ceph_oid_copy
);
1910 static __printf(2, 0)
1911 int oid_printf_vargs(struct ceph_object_id
*oid
, const char *fmt
, va_list ap
)
1915 WARN_ON(!ceph_oid_empty(oid
));
1917 len
= vsnprintf(oid
->inline_name
, sizeof(oid
->inline_name
), fmt
, ap
);
1918 if (len
>= sizeof(oid
->inline_name
))
1921 oid
->name_len
= len
;
1926 * If oid doesn't fit into inline buffer, BUG.
1928 void ceph_oid_printf(struct ceph_object_id
*oid
, const char *fmt
, ...)
1933 BUG_ON(oid_printf_vargs(oid
, fmt
, ap
));
1936 EXPORT_SYMBOL(ceph_oid_printf
);
1938 static __printf(3, 0)
1939 int oid_aprintf_vargs(struct ceph_object_id
*oid
, gfp_t gfp
,
1940 const char *fmt
, va_list ap
)
1946 len
= oid_printf_vargs(oid
, fmt
, aq
);
1950 char *external_name
;
1952 external_name
= kmalloc(len
+ 1, gfp
);
1956 oid
->name
= external_name
;
1957 WARN_ON(vsnprintf(oid
->name
, len
+ 1, fmt
, ap
) != len
);
1958 oid
->name_len
= len
;
1965 * If oid doesn't fit into inline buffer, allocate.
1967 int ceph_oid_aprintf(struct ceph_object_id
*oid
, gfp_t gfp
,
1968 const char *fmt
, ...)
1974 ret
= oid_aprintf_vargs(oid
, gfp
, fmt
, ap
);
1979 EXPORT_SYMBOL(ceph_oid_aprintf
);
1981 void ceph_oid_destroy(struct ceph_object_id
*oid
)
1983 if (oid
->name
!= oid
->inline_name
)
1986 EXPORT_SYMBOL(ceph_oid_destroy
);
1991 static bool __osds_equal(const struct ceph_osds
*lhs
,
1992 const struct ceph_osds
*rhs
)
1994 if (lhs
->size
== rhs
->size
&&
1995 !memcmp(lhs
->osds
, rhs
->osds
, rhs
->size
* sizeof(rhs
->osds
[0])))
2004 static bool osds_equal(const struct ceph_osds
*lhs
,
2005 const struct ceph_osds
*rhs
)
2007 if (__osds_equal(lhs
, rhs
) &&
2008 lhs
->primary
== rhs
->primary
)
2014 static bool osds_valid(const struct ceph_osds
*set
)
2017 if (set
->size
> 0 && set
->primary
>= 0)
2020 /* empty can_shift_osds set */
2021 if (!set
->size
&& set
->primary
== -1)
2024 /* empty !can_shift_osds set - all NONE */
2025 if (set
->size
> 0 && set
->primary
== -1) {
2028 for (i
= 0; i
< set
->size
; i
++) {
2029 if (set
->osds
[i
] != CRUSH_ITEM_NONE
)
2039 void ceph_osds_copy(struct ceph_osds
*dest
, const struct ceph_osds
*src
)
2041 memcpy(dest
->osds
, src
->osds
, src
->size
* sizeof(src
->osds
[0]));
2042 dest
->size
= src
->size
;
2043 dest
->primary
= src
->primary
;
2046 bool ceph_pg_is_split(const struct ceph_pg
*pgid
, u32 old_pg_num
,
2049 int old_bits
= calc_bits_of(old_pg_num
);
2050 int old_mask
= (1 << old_bits
) - 1;
2053 WARN_ON(pgid
->seed
>= old_pg_num
);
2054 if (new_pg_num
<= old_pg_num
)
2057 for (n
= 1; ; n
++) {
2058 int next_bit
= n
<< (old_bits
- 1);
2059 u32 s
= next_bit
| pgid
->seed
;
2061 if (s
< old_pg_num
|| s
== pgid
->seed
)
2063 if (s
>= new_pg_num
)
2066 s
= ceph_stable_mod(s
, old_pg_num
, old_mask
);
2067 if (s
== pgid
->seed
)
2074 bool ceph_is_new_interval(const struct ceph_osds
*old_acting
,
2075 const struct ceph_osds
*new_acting
,
2076 const struct ceph_osds
*old_up
,
2077 const struct ceph_osds
*new_up
,
2084 bool old_sort_bitwise
,
2085 bool new_sort_bitwise
,
2086 bool old_recovery_deletes
,
2087 bool new_recovery_deletes
,
2088 const struct ceph_pg
*pgid
)
2090 return !osds_equal(old_acting
, new_acting
) ||
2091 !osds_equal(old_up
, new_up
) ||
2092 old_size
!= new_size
||
2093 old_min_size
!= new_min_size
||
2094 ceph_pg_is_split(pgid
, old_pg_num
, new_pg_num
) ||
2095 old_sort_bitwise
!= new_sort_bitwise
||
2096 old_recovery_deletes
!= new_recovery_deletes
;
2099 static int calc_pg_rank(int osd
, const struct ceph_osds
*acting
)
2103 for (i
= 0; i
< acting
->size
; i
++) {
2104 if (acting
->osds
[i
] == osd
)
2111 static bool primary_changed(const struct ceph_osds
*old_acting
,
2112 const struct ceph_osds
*new_acting
)
2114 if (!old_acting
->size
&& !new_acting
->size
)
2115 return false; /* both still empty */
2117 if (!old_acting
->size
^ !new_acting
->size
)
2118 return true; /* was empty, now not, or vice versa */
2120 if (old_acting
->primary
!= new_acting
->primary
)
2121 return true; /* primary changed */
2123 if (calc_pg_rank(old_acting
->primary
, old_acting
) !=
2124 calc_pg_rank(new_acting
->primary
, new_acting
))
2127 return false; /* same primary (tho replicas may have changed) */
2130 bool ceph_osds_changed(const struct ceph_osds
*old_acting
,
2131 const struct ceph_osds
*new_acting
,
2134 if (primary_changed(old_acting
, new_acting
))
2137 if (any_change
&& !__osds_equal(old_acting
, new_acting
))
2144 * calculate file layout from given offset, length.
2145 * fill in correct oid, logical length, and object extent
2148 * for now, we write only a single su, until we can
2149 * pass a stride back to the caller.
2151 int ceph_calc_file_object_mapping(struct ceph_file_layout
*layout
,
2154 u64
*oxoff
, u64
*oxlen
)
2156 u32 osize
= layout
->object_size
;
2157 u32 su
= layout
->stripe_unit
;
2158 u32 sc
= layout
->stripe_count
;
2159 u32 bl
, stripeno
, stripepos
, objsetno
;
2163 dout("mapping %llu~%llu osize %u fl_su %u\n", off
, len
,
2165 if (su
== 0 || sc
== 0)
2167 su_per_object
= osize
/ su
;
2168 if (su_per_object
== 0)
2170 dout("osize %u / su %u = su_per_object %u\n", osize
, su
,
2173 if ((su
& ~PAGE_MASK
) != 0)
2176 /* bl = *off / su; */
2180 dout("off %llu / su %u = bl %u\n", off
, su
, bl
);
2183 stripepos
= bl
% sc
;
2184 objsetno
= stripeno
/ su_per_object
;
2186 *ono
= objsetno
* sc
+ stripepos
;
2187 dout("objset %u * sc %u = ono %u\n", objsetno
, sc
, (unsigned int)*ono
);
2189 /* *oxoff = *off % layout->fl_stripe_unit; # offset in su */
2191 su_offset
= do_div(t
, su
);
2192 *oxoff
= su_offset
+ (stripeno
% su_per_object
) * su
;
2195 * Calculate the length of the extent being written to the selected
2196 * object. This is the minimum of the full length requested (len) or
2197 * the remainder of the current stripe being written to.
2199 *oxlen
= min_t(u64
, len
, su
- su_offset
);
2201 dout(" obj extent %llu~%llu\n", *oxoff
, *oxlen
);
2205 dout(" invalid layout\n");
2211 EXPORT_SYMBOL(ceph_calc_file_object_mapping
);
2214 * Map an object into a PG.
2216 * Should only be called with target_oid and target_oloc (as opposed to
2217 * base_oid and base_oloc), since tiering isn't taken into account.
2219 int __ceph_object_locator_to_pg(struct ceph_pg_pool_info
*pi
,
2220 const struct ceph_object_id
*oid
,
2221 const struct ceph_object_locator
*oloc
,
2222 struct ceph_pg
*raw_pgid
)
2224 WARN_ON(pi
->id
!= oloc
->pool
);
2226 if (!oloc
->pool_ns
) {
2227 raw_pgid
->pool
= oloc
->pool
;
2228 raw_pgid
->seed
= ceph_str_hash(pi
->object_hash
, oid
->name
,
2230 dout("%s %s -> raw_pgid %llu.%x\n", __func__
, oid
->name
,
2231 raw_pgid
->pool
, raw_pgid
->seed
);
2233 char stack_buf
[256];
2234 char *buf
= stack_buf
;
2235 int nsl
= oloc
->pool_ns
->len
;
2236 size_t total
= nsl
+ 1 + oid
->name_len
;
2238 if (total
> sizeof(stack_buf
)) {
2239 buf
= kmalloc(total
, GFP_NOIO
);
2243 memcpy(buf
, oloc
->pool_ns
->str
, nsl
);
2245 memcpy(buf
+ nsl
+ 1, oid
->name
, oid
->name_len
);
2246 raw_pgid
->pool
= oloc
->pool
;
2247 raw_pgid
->seed
= ceph_str_hash(pi
->object_hash
, buf
, total
);
2248 if (buf
!= stack_buf
)
2250 dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__
,
2251 oid
->name
, nsl
, oloc
->pool_ns
->str
,
2252 raw_pgid
->pool
, raw_pgid
->seed
);
2257 int ceph_object_locator_to_pg(struct ceph_osdmap
*osdmap
,
2258 const struct ceph_object_id
*oid
,
2259 const struct ceph_object_locator
*oloc
,
2260 struct ceph_pg
*raw_pgid
)
2262 struct ceph_pg_pool_info
*pi
;
2264 pi
= ceph_pg_pool_by_id(osdmap
, oloc
->pool
);
2268 return __ceph_object_locator_to_pg(pi
, oid
, oloc
, raw_pgid
);
2270 EXPORT_SYMBOL(ceph_object_locator_to_pg
);
2273 * Map a raw PG (full precision ps) into an actual PG.
2275 static void raw_pg_to_pg(struct ceph_pg_pool_info
*pi
,
2276 const struct ceph_pg
*raw_pgid
,
2277 struct ceph_pg
*pgid
)
2279 pgid
->pool
= raw_pgid
->pool
;
2280 pgid
->seed
= ceph_stable_mod(raw_pgid
->seed
, pi
->pg_num
,
2285 * Map a raw PG (full precision ps) into a placement ps (placement
2286 * seed). Include pool id in that value so that different pools don't
2287 * use the same seeds.
2289 static u32
raw_pg_to_pps(struct ceph_pg_pool_info
*pi
,
2290 const struct ceph_pg
*raw_pgid
)
2292 if (pi
->flags
& CEPH_POOL_FLAG_HASHPSPOOL
) {
2293 /* hash pool id and seed so that pool PGs do not overlap */
2294 return crush_hash32_2(CRUSH_HASH_RJENKINS1
,
2295 ceph_stable_mod(raw_pgid
->seed
,
2301 * legacy behavior: add ps and pool together. this is
2302 * not a great approach because the PGs from each pool
2303 * will overlap on top of each other: 0.5 == 1.4 ==
2306 return ceph_stable_mod(raw_pgid
->seed
, pi
->pgp_num
,
2308 (unsigned)raw_pgid
->pool
;
2313 * Magic value used for a "default" fallback choose_args, used if the
2314 * crush_choose_arg_map passed to do_crush() does not exist. If this
2315 * also doesn't exist, fall back to canonical weights.
2317 #define CEPH_DEFAULT_CHOOSE_ARGS -1
2319 static int do_crush(struct ceph_osdmap
*map
, int ruleno
, int x
,
2320 int *result
, int result_max
,
2321 const __u32
*weight
, int weight_max
,
2322 s64 choose_args_index
)
2324 struct crush_choose_arg_map
*arg_map
;
2327 BUG_ON(result_max
> CEPH_PG_MAX_SIZE
);
2329 arg_map
= lookup_choose_arg_map(&map
->crush
->choose_args
,
2332 arg_map
= lookup_choose_arg_map(&map
->crush
->choose_args
,
2333 CEPH_DEFAULT_CHOOSE_ARGS
);
2335 mutex_lock(&map
->crush_workspace_mutex
);
2336 r
= crush_do_rule(map
->crush
, ruleno
, x
, result
, result_max
,
2337 weight
, weight_max
, map
->crush_workspace
,
2338 arg_map
? arg_map
->args
: NULL
);
2339 mutex_unlock(&map
->crush_workspace_mutex
);
2344 static void remove_nonexistent_osds(struct ceph_osdmap
*osdmap
,
2345 struct ceph_pg_pool_info
*pi
,
2346 struct ceph_osds
*set
)
2350 if (ceph_can_shift_osds(pi
)) {
2354 for (i
= 0; i
< set
->size
; i
++) {
2355 if (!ceph_osd_exists(osdmap
, set
->osds
[i
])) {
2360 set
->osds
[i
- removed
] = set
->osds
[i
];
2362 set
->size
-= removed
;
2364 /* set dne devices to NONE */
2365 for (i
= 0; i
< set
->size
; i
++) {
2366 if (!ceph_osd_exists(osdmap
, set
->osds
[i
]))
2367 set
->osds
[i
] = CRUSH_ITEM_NONE
;
2373 * Calculate raw set (CRUSH output) for given PG and filter out
2374 * nonexistent OSDs. ->primary is undefined for a raw set.
2376 * Placement seed (CRUSH input) is returned through @ppps.
2378 static void pg_to_raw_osds(struct ceph_osdmap
*osdmap
,
2379 struct ceph_pg_pool_info
*pi
,
2380 const struct ceph_pg
*raw_pgid
,
2381 struct ceph_osds
*raw
,
2384 u32 pps
= raw_pg_to_pps(pi
, raw_pgid
);
2388 ceph_osds_init(raw
);
2392 ruleno
= crush_find_rule(osdmap
->crush
, pi
->crush_ruleset
, pi
->type
,
2395 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
2396 pi
->id
, pi
->crush_ruleset
, pi
->type
, pi
->size
);
2400 if (pi
->size
> ARRAY_SIZE(raw
->osds
)) {
2401 pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
2402 pi
->id
, pi
->crush_ruleset
, pi
->type
, pi
->size
,
2403 ARRAY_SIZE(raw
->osds
));
2407 len
= do_crush(osdmap
, ruleno
, pps
, raw
->osds
, pi
->size
,
2408 osdmap
->osd_weight
, osdmap
->max_osd
, pi
->id
);
2410 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
2411 len
, ruleno
, pi
->id
, pi
->crush_ruleset
, pi
->type
,
2417 remove_nonexistent_osds(osdmap
, pi
, raw
);
2420 /* apply pg_upmap[_items] mappings */
2421 static void apply_upmap(struct ceph_osdmap
*osdmap
,
2422 const struct ceph_pg
*pgid
,
2423 struct ceph_osds
*raw
)
2425 struct ceph_pg_mapping
*pg
;
2428 pg
= lookup_pg_mapping(&osdmap
->pg_upmap
, pgid
);
2430 /* make sure targets aren't marked out */
2431 for (i
= 0; i
< pg
->pg_upmap
.len
; i
++) {
2432 int osd
= pg
->pg_upmap
.osds
[i
];
2434 if (osd
!= CRUSH_ITEM_NONE
&&
2435 osd
< osdmap
->max_osd
&&
2436 osdmap
->osd_weight
[osd
] == 0) {
2437 /* reject/ignore explicit mapping */
2441 for (i
= 0; i
< pg
->pg_upmap
.len
; i
++)
2442 raw
->osds
[i
] = pg
->pg_upmap
.osds
[i
];
2443 raw
->size
= pg
->pg_upmap
.len
;
2444 /* check and apply pg_upmap_items, if any */
2447 pg
= lookup_pg_mapping(&osdmap
->pg_upmap_items
, pgid
);
2450 * Note: this approach does not allow a bidirectional swap,
2451 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
2453 for (i
= 0; i
< pg
->pg_upmap_items
.len
; i
++) {
2454 int from
= pg
->pg_upmap_items
.from_to
[i
][0];
2455 int to
= pg
->pg_upmap_items
.from_to
[i
][1];
2457 bool exists
= false;
2459 /* make sure replacement doesn't already appear */
2460 for (j
= 0; j
< raw
->size
; j
++) {
2461 int osd
= raw
->osds
[j
];
2467 /* ignore mapping if target is marked out */
2468 if (osd
== from
&& pos
< 0 &&
2469 !(to
!= CRUSH_ITEM_NONE
&&
2470 to
< osdmap
->max_osd
&&
2471 osdmap
->osd_weight
[to
] == 0)) {
2475 if (!exists
&& pos
>= 0)
2476 raw
->osds
[pos
] = to
;
2482 * Given raw set, calculate up set and up primary. By definition of an
2483 * up set, the result won't contain nonexistent or down OSDs.
2485 * This is done in-place - on return @set is the up set. If it's
2486 * empty, ->primary will remain undefined.
2488 static void raw_to_up_osds(struct ceph_osdmap
*osdmap
,
2489 struct ceph_pg_pool_info
*pi
,
2490 struct ceph_osds
*set
)
2494 /* ->primary is undefined for a raw set */
2495 BUG_ON(set
->primary
!= -1);
2497 if (ceph_can_shift_osds(pi
)) {
2501 for (i
= 0; i
< set
->size
; i
++) {
2502 if (ceph_osd_is_down(osdmap
, set
->osds
[i
])) {
2507 set
->osds
[i
- removed
] = set
->osds
[i
];
2509 set
->size
-= removed
;
2511 set
->primary
= set
->osds
[0];
2513 /* set down/dne devices to NONE */
2514 for (i
= set
->size
- 1; i
>= 0; i
--) {
2515 if (ceph_osd_is_down(osdmap
, set
->osds
[i
]))
2516 set
->osds
[i
] = CRUSH_ITEM_NONE
;
2518 set
->primary
= set
->osds
[i
];
2523 static void apply_primary_affinity(struct ceph_osdmap
*osdmap
,
2524 struct ceph_pg_pool_info
*pi
,
2526 struct ceph_osds
*up
)
2532 * Do we have any non-default primary_affinity values for these
2535 if (!osdmap
->osd_primary_affinity
)
2538 for (i
= 0; i
< up
->size
; i
++) {
2539 int osd
= up
->osds
[i
];
2541 if (osd
!= CRUSH_ITEM_NONE
&&
2542 osdmap
->osd_primary_affinity
[osd
] !=
2543 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
) {
2551 * Pick the primary. Feed both the seed (for the pg) and the
2552 * osd into the hash/rng so that a proportional fraction of an
2553 * osd's pgs get rejected as primary.
2555 for (i
= 0; i
< up
->size
; i
++) {
2556 int osd
= up
->osds
[i
];
2559 if (osd
== CRUSH_ITEM_NONE
)
2562 aff
= osdmap
->osd_primary_affinity
[osd
];
2563 if (aff
< CEPH_OSD_MAX_PRIMARY_AFFINITY
&&
2564 (crush_hash32_2(CRUSH_HASH_RJENKINS1
,
2565 pps
, osd
) >> 16) >= aff
) {
2567 * We chose not to use this primary. Note it
2568 * anyway as a fallback in case we don't pick
2569 * anyone else, but keep looking.
2581 up
->primary
= up
->osds
[pos
];
2583 if (ceph_can_shift_osds(pi
) && pos
> 0) {
2584 /* move the new primary to the front */
2585 for (i
= pos
; i
> 0; i
--)
2586 up
->osds
[i
] = up
->osds
[i
- 1];
2587 up
->osds
[0] = up
->primary
;
2592 * Get pg_temp and primary_temp mappings for given PG.
2594 * Note that a PG may have none, only pg_temp, only primary_temp or
2595 * both pg_temp and primary_temp mappings. This means @temp isn't
2596 * always a valid OSD set on return: in the "only primary_temp" case,
2597 * @temp will have its ->primary >= 0 but ->size == 0.
2599 static void get_temp_osds(struct ceph_osdmap
*osdmap
,
2600 struct ceph_pg_pool_info
*pi
,
2601 const struct ceph_pg
*pgid
,
2602 struct ceph_osds
*temp
)
2604 struct ceph_pg_mapping
*pg
;
2607 ceph_osds_init(temp
);
2610 pg
= lookup_pg_mapping(&osdmap
->pg_temp
, pgid
);
2612 for (i
= 0; i
< pg
->pg_temp
.len
; i
++) {
2613 if (ceph_osd_is_down(osdmap
, pg
->pg_temp
.osds
[i
])) {
2614 if (ceph_can_shift_osds(pi
))
2617 temp
->osds
[temp
->size
++] = CRUSH_ITEM_NONE
;
2619 temp
->osds
[temp
->size
++] = pg
->pg_temp
.osds
[i
];
2623 /* apply pg_temp's primary */
2624 for (i
= 0; i
< temp
->size
; i
++) {
2625 if (temp
->osds
[i
] != CRUSH_ITEM_NONE
) {
2626 temp
->primary
= temp
->osds
[i
];
2633 pg
= lookup_pg_mapping(&osdmap
->primary_temp
, pgid
);
2635 temp
->primary
= pg
->primary_temp
.osd
;
2639 * Map a PG to its acting set as well as its up set.
2641 * Acting set is used for data mapping purposes, while up set can be
2642 * recorded for detecting interval changes and deciding whether to
2645 void ceph_pg_to_up_acting_osds(struct ceph_osdmap
*osdmap
,
2646 struct ceph_pg_pool_info
*pi
,
2647 const struct ceph_pg
*raw_pgid
,
2648 struct ceph_osds
*up
,
2649 struct ceph_osds
*acting
)
2651 struct ceph_pg pgid
;
2654 WARN_ON(pi
->id
!= raw_pgid
->pool
);
2655 raw_pg_to_pg(pi
, raw_pgid
, &pgid
);
2657 pg_to_raw_osds(osdmap
, pi
, raw_pgid
, up
, &pps
);
2658 apply_upmap(osdmap
, &pgid
, up
);
2659 raw_to_up_osds(osdmap
, pi
, up
);
2660 apply_primary_affinity(osdmap
, pi
, pps
, up
);
2661 get_temp_osds(osdmap
, pi
, &pgid
, acting
);
2662 if (!acting
->size
) {
2663 memcpy(acting
->osds
, up
->osds
, up
->size
* sizeof(up
->osds
[0]));
2664 acting
->size
= up
->size
;
2665 if (acting
->primary
== -1)
2666 acting
->primary
= up
->primary
;
2668 WARN_ON(!osds_valid(up
) || !osds_valid(acting
));
2671 bool ceph_pg_to_primary_shard(struct ceph_osdmap
*osdmap
,
2672 struct ceph_pg_pool_info
*pi
,
2673 const struct ceph_pg
*raw_pgid
,
2674 struct ceph_spg
*spgid
)
2676 struct ceph_pg pgid
;
2677 struct ceph_osds up
, acting
;
2680 WARN_ON(pi
->id
!= raw_pgid
->pool
);
2681 raw_pg_to_pg(pi
, raw_pgid
, &pgid
);
2683 if (ceph_can_shift_osds(pi
)) {
2684 spgid
->pgid
= pgid
; /* struct */
2685 spgid
->shard
= CEPH_SPG_NOSHARD
;
2689 ceph_pg_to_up_acting_osds(osdmap
, pi
, &pgid
, &up
, &acting
);
2690 for (i
= 0; i
< acting
.size
; i
++) {
2691 if (acting
.osds
[i
] == acting
.primary
) {
2692 spgid
->pgid
= pgid
; /* struct */
2702 * Return acting primary for given PG, or -1 if none.
2704 int ceph_pg_to_acting_primary(struct ceph_osdmap
*osdmap
,
2705 const struct ceph_pg
*raw_pgid
)
2707 struct ceph_pg_pool_info
*pi
;
2708 struct ceph_osds up
, acting
;
2710 pi
= ceph_pg_pool_by_id(osdmap
, raw_pgid
->pool
);
2714 ceph_pg_to_up_acting_osds(osdmap
, pi
, raw_pgid
, &up
, &acting
);
2715 return acting
.primary
;
2717 EXPORT_SYMBOL(ceph_pg_to_acting_primary
);