mirror of
https://github.com/t2linux/librxpc.git
synced 2026-04-30 13:53:55 -07:00
68b29b5350
i think some of the changes here might be by mrarm not me but ive forgotten
188 lines
6.8 KiB
C
188 lines
6.8 KiB
C
#include "session.h"
|
|
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <sys/param.h>
|
|
#include <inttypes.h>
|
|
#include <assert.h>
|
|
#include <xpc/xpc_serialization.h>
|
|
#include <xpc/xpc_debug.h>
|
|
#include "stream.h"
|
|
|
|
static void _rxpc_session_send_settings(struct rxpc_session *s);
|
|
static void _rxpc_session_root_stream_opened(struct rxpc_stream *stream);
|
|
static void _rxpc_session_reply_stream_opened(struct rxpc_stream *stream);
|
|
|
|
void rxpc_session_init(struct rxpc_session *s) {
|
|
s->session = NULL;
|
|
s->stream_root = NULL;
|
|
s->stream_reply = NULL;
|
|
}
|
|
|
|
void rxpc_session_open(struct rxpc_session *s, nghttp2_session_callbacks *cb,
|
|
void *transport_data, struct rxpc_stream_callbacks *ready_cbs) {
|
|
struct rxpc_stream_callbacks cbs = {0};
|
|
nghttp2_option *opt;
|
|
s->transport_data = transport_data;
|
|
nghttp2_option_new(&opt);
|
|
nghttp2_option_set_no_http_messaging(opt, 1);
|
|
nghttp2_session_client_new2(&s->session, cb, s, opt);
|
|
nghttp2_option_del(opt);
|
|
_rxpc_session_send_settings(s);
|
|
|
|
s->stream_root = rxpc_stream_open(s, ready_cbs);
|
|
}
|
|
|
|
void rxpc_session_terminate(struct rxpc_session *s) {
|
|
// TODO:
|
|
}
|
|
|
|
int rxpc_session_send_pending(struct rxpc_session *s) {
|
|
int ret = nghttp2_session_send(s->session);
|
|
if (ret != 0) {
|
|
fprintf(stderr, "xrpc: session send error: %s\n", nghttp2_strerror(ret));
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static void _rxpc_session_send_settings(struct rxpc_session *s) {
|
|
int rv;
|
|
nghttp2_settings_entry iv[1] = {
|
|
{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}
|
|
};
|
|
|
|
rv = nghttp2_submit_settings(s->session, NGHTTP2_FLAG_NONE, iv, 1);
|
|
if (rv != 0)
|
|
fprintf(stderr, "xrpc: could not submit settings: %s", nghttp2_strerror(rv));
|
|
}
|
|
|
|
void _rxpc_session_message_debug(struct rxpc_stream *stream, struct rxpc_msg_header *header, const void *data) {
|
|
printf("rxpc: got message { type = %u, flags = %u, msg_id = %" PRIu64 " }\n",
|
|
header->type, header->flags, header->msg_id);
|
|
if (header->length > 0) {
|
|
xpc_object_t o = xpc_deserialize(data, header->length);
|
|
xpc_debug_print_stdout(o);
|
|
printf("\n");
|
|
xpc_free(o);
|
|
}
|
|
}
|
|
|
|
static int _rxpc_session_cb_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) {
|
|
struct rxpc_stream *stream = (struct rxpc_stream *)
|
|
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
|
|
switch (frame->hd.type) {
|
|
case NGHTTP2_HEADERS:
|
|
//fprintf(stderr, "rxpc: headers received\n");
|
|
if (stream->cbs.opened)
|
|
stream->cbs.opened(stream);
|
|
break;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int _rxpc_session_cb_data_chunk_recv(nghttp2_session *session, uint8_t flags, int32_t stream_id,
|
|
const uint8_t *data, size_t len, void *user_data) {
|
|
size_t tlen;
|
|
struct rxpc_stream *stream = (struct rxpc_stream *)
|
|
nghttp2_session_get_stream_user_data(session, stream_id);
|
|
struct rxpc_msg_header *header = (struct rxpc_msg_header *) stream->recv_header_data;
|
|
#ifdef DEBUG_RAW_IO
|
|
printf("rxpc: stream %d: received data chunk\n", stream_id);
|
|
for (int i = 0; i < len; i++)
|
|
printf("%02x ", data[i]);
|
|
printf("\n");
|
|
#endif
|
|
while (len > 0) {
|
|
// Read the header if needed
|
|
if (stream->recv_header_pos < sizeof(struct rxpc_msg_header)) {
|
|
if (stream->recv_header_pos == 0 && len >= sizeof(struct rxpc_msg_header)) { // 99% of cases
|
|
*((struct rxpc_msg_header *) stream->recv_header_data) = *((struct rxpc_msg_header *) data);
|
|
stream->recv_header_pos = sizeof(struct rxpc_msg_header);
|
|
data += sizeof(struct rxpc_msg_header);
|
|
len -= sizeof(struct rxpc_msg_header);
|
|
} else {
|
|
tlen = MIN(sizeof(struct rxpc_msg_header) - stream->recv_header_pos, len);
|
|
memcpy(&stream->recv_header_data[stream->recv_header_pos], data, tlen);
|
|
stream->recv_header_pos += tlen;
|
|
data += tlen;
|
|
len -= tlen;
|
|
if (stream->recv_header_pos < sizeof(struct rxpc_msg_header))
|
|
break; // Incomplete, not enough data to fill the remaining space
|
|
}
|
|
|
|
// Validate the header
|
|
if (header->magic != RXPC_MSG_MAGIC) {
|
|
fprintf(stderr, "rxpc: recv bad magic %" PRIx32, header->magic);
|
|
return -1;
|
|
}
|
|
if (header->version != RXPC_MSG_VERSION) {
|
|
fprintf(stderr, "rxpc: recv bad version %" PRIx32, header->version);
|
|
return -1;
|
|
}
|
|
if (header->length > 0x10000) {
|
|
fprintf(stderr, "rxpc: recv message too long: %" PRIu64 "\n", header->length);
|
|
return -1;
|
|
}
|
|
#ifdef DEBUG
|
|
printf("starting data read of %lli\n", header->length);
|
|
#endif
|
|
}
|
|
|
|
// Read the data
|
|
if (stream->recv_data_pos == 0 && len >= header->length) {
|
|
// No need to copy the data, it's all in the buffer
|
|
assert(stream->recv_data == NULL);
|
|
if (stream->cbs.message)
|
|
stream->cbs.message(stream, header, data);
|
|
data += header->length;
|
|
len -= header->length;
|
|
} else if (len > 0) {
|
|
if (!stream->recv_data)
|
|
stream->recv_data = malloc(header->length);
|
|
tlen = MIN(len, header->length - stream->recv_data_pos);
|
|
memcpy(&stream->recv_data[stream->recv_data_pos], data, tlen);
|
|
data += tlen;
|
|
len -= tlen;
|
|
stream->recv_data_pos += tlen;
|
|
|
|
if (stream->recv_data_pos != header->length)
|
|
break; // Incomplete
|
|
|
|
if (stream->cbs.message)
|
|
stream->cbs.message(stream, header, stream->recv_data);
|
|
}
|
|
|
|
// Message complete, prepare for next message
|
|
stream->recv_header_pos = 0;
|
|
if (stream->recv_data)
|
|
free(stream->recv_data);
|
|
stream->recv_data_pos = 0;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int _rxpc_session_cb_stream_close(nghttp2_session *session, int32_t stream_id, nghttp2_error_code error_code,
|
|
void *user_data) {
|
|
int ret;
|
|
|
|
fprintf(stderr, "rxpc: stream %d closed with code %d\n", stream_id, error_code);
|
|
ret = nghttp2_session_terminate_session(session, NGHTTP2_NO_ERROR);
|
|
if (ret != 0)
|
|
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
|
return 0;
|
|
}
|
|
|
|
|
|
nghttp2_session_callbacks *rxpc_session_create_callbacks() {
|
|
nghttp2_session_callbacks *callbacks;
|
|
|
|
nghttp2_session_callbacks_new(&callbacks);
|
|
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, _rxpc_session_cb_frame_recv);
|
|
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, _rxpc_session_cb_data_chunk_recv);
|
|
nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, _rxpc_session_cb_stream_close);
|
|
|
|
return callbacks;
|
|
}
|
|
|