/* authorize an outgoing connection */ struct ceph_auth_handshake *(*get_authorizer) ( struct ceph_connection *con, int *proto, int force_new); int (*add_authorizer_challenge)(struct ceph_connection *con, void *challenge_buf, int challenge_buf_len); int (*verify_authorizer_reply) (struct ceph_connection *con); int (*invalidate_authorizer)(struct ceph_connection *con);
/* there was some error on the socket (disconnect, whatever) */ void (*fault) (struct ceph_connection *con);
/* a remote host as terminated a message exchange session, and messages
* we sent (or they tried to send us) may be lost. */ void (*peer_reset) (struct ceph_connection *con);
int (*sign_message) (struct ceph_msg *msg); int (*check_message_signature) (struct ceph_msg *msg);
/* msgr2 authentication exchange */ int (*get_auth_request)(struct ceph_connection *con, void *buf, int *buf_len, void **authorizer, int *authorizer_len); int (*handle_auth_reply_more)(struct ceph_connection *con, void *reply, int reply_len, void *buf, int *buf_len, void **authorizer, int *authorizer_len); int (*handle_auth_done)(struct ceph_connection *con,
u64 global_id, void *reply, int reply_len,
u8 *session_key, int *session_key_len,
u8 *con_secret, int *con_secret_len); int (*handle_auth_bad_method)(struct ceph_connection *con, int used_proto, int result, constint *allowed_protos, int proto_cnt, constint *allowed_modes, int mode_cnt);
/** * sparse_read: read sparse data * @con: connection we're reading from * @cursor: data cursor for reading extents * @buf: optional buffer to read into * * This should be called more than once, each time setting up to * receive an extent into the current cursor position, and zeroing * the holes between them. * * Returns amount of data to be read (in bytes), 0 if reading is * complete, or -errno if there was an error. * * If @buf is set on a >0 return, then the data should be read into * the provided buffer. Otherwise, it should be read into the cursor. * * The sparse read operation is expected to initialize the cursor * with a length covering up to the end of the last extent.
*/ int (*sparse_read)(struct ceph_connection *con, struct ceph_msg_data_cursor *cursor, char **buf);
};
/* use format string %s%lld */ #define ENTITY_NAME(n) ceph_entity_type_name((n).type), le64_to_cpu((n).num)
/* * the global_seq counts connections i (attempt to) initiate * in order to disambiguate certain connect race conditions.
*/
u32 global_seq;
spinlock_t global_seq_lock;
};
enum ceph_msg_data_type {
CEPH_MSG_DATA_NONE, /* message contains no data payload */
CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */
CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */ #ifdef CONFIG_BLOCK
CEPH_MSG_DATA_BIO, /* data source/destination is a bio list */ #endif/* CONFIG_BLOCK */
CEPH_MSG_DATA_BVECS, /* data source/destination is a bio_vec array */
CEPH_MSG_DATA_ITER, /* data source/destination is an iov_iter */
};
#ifdef CONFIG_BLOCK
struct ceph_bio_iter { struct bio *bio; struct bvec_iter iter;
};
#define __ceph_bio_iter_advance_step(it, n, STEP) do { \ unsignedint __n = (n), __cur_n; \
\ while (__n) { \
BUG_ON(!(it)->iter.bi_size); \
__cur_n = min((it)->iter.bi_size, __n); \
(void)(STEP); \
bio_advance_iter((it)->bio, &(it)->iter, __cur_n); \ if (!(it)->iter.bi_size && (it)->bio->bi_next) { \
dout("__ceph_bio_iter_advance_step next bio\n"); \
(it)->bio = (it)->bio->bi_next; \
(it)->iter = (it)->bio->bi_iter; \
} \
__n -= __cur_n; \
} \
} while (0)
/* * Advance @it by @n bytes.
*/ #define ceph_bio_iter_advance(it, n) \
__ceph_bio_iter_advance_step(it, n, 0)
/* * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec.
*/ #define ceph_bio_iter_advance_step(it, n, BVEC_STEP) \
__ceph_bio_iter_advance_step(it, n, ({ \ struct bio_vec bv; \ struct bvec_iter __cur_iter; \
\
__cur_iter = (it)->iter; \
__cur_iter.bi_size = __cur_n; \
__bio_for_each_segment(bv, (it)->bio, __cur_iter, __cur_iter) \
(void)(BVEC_STEP); \
}))
struct ceph_msg_data_cursor {
size_t total_resid; /* across all data items */
struct ceph_msg_data *data; /* current data item */
size_t resid; /* bytes not yet consumed */ int sr_resid; /* residual sparse_read len */ bool need_crc; /* crc update needed */ union { #ifdef CONFIG_BLOCK struct ceph_bio_iter bio_iter; #endif/* CONFIG_BLOCK */ struct bvec_iter bvec_iter; struct { /* pages */ unsignedint page_offset; /* offset in page */ unsignedshort page_index; /* index in array */ unsignedshort page_count; /* pages in array */
}; struct { /* pagelist */ struct page *page; /* page from list */
size_t offset; /* bytes from list */
}; struct { struct iov_iter iov_iter; unsignedint lastlen;
};
};
};
/* * a single message. it contains a header (src, dest, message type, etc.), * footer (crc values, mainly), a "front" message body, and possibly a * data payload (stored in some number of pages).
*/ struct ceph_msg { struct ceph_msg_header hdr; /* header */ union { struct ceph_msg_footer footer; /* footer */ struct ceph_msg_footer_old old_footer; /* old format footer */
}; struct kvec front; /* unaligned blobs of message */ struct ceph_buffer *middle;
size_t data_length; struct ceph_msg_data *data; int num_data_items; int max_data_items; struct ceph_msg_data_cursor cursor;
/* * ceph_connection flag bits
*/ #define CEPH_CON_F_LOSSYTX 0 /* we can close channel or drop
messages on errors */ #define CEPH_CON_F_KEEPALIVE_PENDING 1 /* we need to send a keepalive */ #define CEPH_CON_F_WRITE_PENDING 2 /* we have data ready to send */ #define CEPH_CON_F_SOCK_CLOSED 3 /* socket state changed to closed */ #define CEPH_CON_F_BACKOFF 4 /* need to retry queuing delayed
work */
struct ceph_connection_v1_info { struct kvec out_kvec[8], /* sending header/footer data */
*out_kvec_cur; int out_kvec_left; /* kvec's left in out_kvec */ int out_skip; /* skip this many bytes */ int out_kvec_bytes; /* total bytes left */ bool out_more; /* there is more data after the kvecs */ bool out_msg_done;
struct ceph_auth_handshake *auth; int auth_retry; /* true if we need a newer authorizer */
/* sparse reads */ struct kvec in_sr_kvec; /* current location to receive into */
u64 in_sr_len; /* amount of data in this extent */
/* message in temps */
u8 in_tag; /* protocol control byte */ struct ceph_msg_header in_hdr;
__le64 in_temp_ack; /* for reading an ack */
/* message out temps */ struct ceph_msg_header out_hdr;
__le64 out_temp_ack; /* for writing an ack */ struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2
stamp */
u32 connect_seq; /* identify the most recent connection
attempt for this session */
u32 peer_global_seq; /* peer's global seq for this connection */
};
struct ceph_frame_desc { int fd_tag; /* FRAME_TAG_* */ int fd_seg_cnt; int fd_lens[CEPH_FRAME_MAX_SEGMENT_COUNT]; /* logical */ int fd_aligns[CEPH_FRAME_MAX_SEGMENT_COUNT];
};
struct page **in_enc_pages; int in_enc_page_cnt; int in_enc_resid; int in_enc_i; struct page **out_enc_pages; int out_enc_page_cnt; int out_enc_resid; int out_enc_i;
int con_mode; /* CEPH_CON_MODE_* */
void *conn_bufs[16]; int conn_buf_cnt; int data_len_remain;
struct kvec in_sign_kvecs[8]; struct kvec out_sign_kvecs[8]; int in_sign_kvec_cnt; int out_sign_kvec_cnt;
/* * A single connection with another host. * * We maintain a queue of outgoing messages, and some session state to * ensure that we can preserve the lossless, ordered delivery of * messages in the case of a TCP disconnect.
*/ struct ceph_connection { void *private;
conststruct ceph_connection_operations *ops;
struct ceph_messenger *msgr;
int state; /* CEPH_CON_S_* */
atomic_t sock_state; struct socket *sock;
unsignedlong flags; /* CEPH_CON_F_* */ constchar *error_msg; /* error message, if any */
/* out queue */ struct list_head out_queue; struct list_head out_sent; /* sending or sent but unacked */
u64 out_seq; /* last message queued for send */
u64 in_seq, in_seq_acked; /* last message received, acked */
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.