From ee06be207a5649dfeade19e9701ca386f25466ba Mon Sep 17 00:00:00 2001 From: MCMrARM Date: Tue, 3 Sep 2019 19:54:08 +0200 Subject: [PATCH] Initial data receive impl --- session.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- stream.c | 4 +++- stream.h | 8 +++++++ 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/session.c b/session.c index c21498b..c4f4136 100644 --- a/session.c +++ b/session.c @@ -1,8 +1,11 @@ #include "session.h" #include +#include +#include +#include +#include #include "stream.h" -#include "proto.h" static void _rxpc_session_send_settings(struct rxpc_session *s); static void _rxpc_session_root_stream_opened(struct rxpc_stream *stream); @@ -85,12 +88,72 @@ static int _rxpc_session_cb_frame_recv(nghttp2_session *session, const nghttp2_f static int _rxpc_session_cb_data_chunk_recv(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data) { + size_t tlen; + struct rxpc_stream *stream = (struct rxpc_stream *) + nghttp2_session_get_stream_user_data(session, stream_id); + struct rxpc_msg_header *header = (struct rxpc_msg_header *) stream->recv_header_data; #ifdef DEBUG_RAW_IO printf("rxpc: stream %d: received data chunk\n", stream_id); for (int i = 0; i < len; i++) printf("%02x ", data[i]); printf("\n"); #endif + while (len > 0) { + // Read the header if needed + if (stream->recv_header_pos < sizeof(struct rxpc_msg_header)) { + if (stream->recv_header_pos == 0 && len >= sizeof(struct rxpc_msg_header)) { // 99% of cases + *((struct rxpc_msg_header *) stream->recv_header_data) = *((struct rxpc_msg_header *) data); + data += sizeof(struct rxpc_msg_header); + len -= sizeof(struct rxpc_msg_header); + } else { + tlen = MIN(sizeof(struct rxpc_msg_header) - stream->recv_header_pos, len); + memcpy(&stream->recv_header_data[stream->recv_header_pos], data, tlen); + stream->recv_header_pos += tlen; + data += tlen; + len -= tlen; + if (stream->recv_header_pos < sizeof(struct rxpc_msg_header)) + break; // Incomplete, not enough data to fill the remaining space + } + + // Validate the header + if (header->magic != RXPC_MSG_MAGIC) { + fprintf(stderr, "rxpc: recv bad magic %" PRIx32, header->magic); + return -1; + } + if (header->length > 0x10000) { + fprintf(stderr, "rxpc: recv message too long: %" PRIu64 "\n", header->length); + return -1; + } + } + + // Read the data + if (stream->recv_data_pos == 0 && len >= header->length) { + // No need to copy the data, it's all in the buffer + assert(stream->recv_data == NULL); + stream->cbs.message(stream, header, data); + data += header->length; + len -= header->length; + } else if (len > 0) { + if (!stream->recv_data) + stream->recv_data = malloc(header->length); + tlen = MIN(len, header->length - stream->recv_data_pos); + memcpy(&stream->recv_data[stream->recv_data_pos], header, tlen); + stream->recv_data_pos += tlen; + data += tlen; + len -= tlen; + + if (stream->recv_data_pos != header->length) + break; // Incomplete + + stream->cbs.message(stream, header, stream->recv_data); + } + + // Message complete, prepare for next message + stream->recv_header_pos = 0; + if (stream->recv_data) + free(stream->recv_data); + stream->recv_data_pos = 0; + } return 0; } diff --git a/stream.c b/stream.c index 34040b9..c634a4f 100644 --- a/stream.c +++ b/stream.c @@ -3,7 +3,6 @@ #include #include #include "session.h" -#include "proto.h" #include static ssize_t _rxpc_stream_read(nghttp2_session *session, int32_t stream_id, uint8_t *buf, size_t length, @@ -18,6 +17,9 @@ struct rxpc_stream *rxpc_stream_open(struct rxpc_session *session, struct rxpc_s ret->send_data_end = NULL; provider.source.ptr = ret; provider.read_callback = _rxpc_stream_read; + ret->recv_header_pos = 0; + ret->recv_data_pos = 0; + ret->recv_data = NULL; ret->id = nghttp2_submit_request(session->session, NULL, NULL, 0, &provider, ret); if (ret->id <= 0) { fprintf(stderr, "rxpc: failed to open new stream: %s\n", nghttp2_strerror(ret->id)); diff --git a/stream.h b/stream.h index b041253..cd6c9fd 100644 --- a/stream.h +++ b/stream.h @@ -4,21 +4,29 @@ #include #include #include +#include "proto.h" struct rxpc_session; struct rxpc_stream; struct rxpc_stream_pending_data; typedef void (*rxpc_stream_opened_cb)(struct rxpc_stream *); +typedef void (*rxpc_stream_message_cb)(struct rxpc_stream *, struct rxpc_msg_header *header, const void *data); struct rxpc_stream_callbacks { rxpc_stream_opened_cb opened; + rxpc_stream_message_cb message; }; struct rxpc_stream { struct rxpc_session *session; int32_t id; struct rxpc_stream_callbacks cbs; struct rxpc_stream_pending_data *send_data_begin, *send_data_end; + + char recv_header_data[sizeof(struct rxpc_msg_header)]; + size_t recv_header_pos; + char *recv_data; + size_t recv_data_pos; }; struct rxpc_stream_pending_data { struct rxpc_stream_pending_data *next;