Initial data receive impl

This commit is contained in:
MCMrARM
2019-09-03 19:54:08 +02:00
parent 21f5a850e1
commit ee06be207a
3 changed files with 75 additions and 2 deletions
+64 -1
View File
@@ -1,8 +1,11 @@
#include "session.h"
#include <stdio.h>
#include <string.h>
#include <sys/param.h>
#include <inttypes.h>
#include <assert.h>
#include "stream.h"
#include "proto.h"
static void _rxpc_session_send_settings(struct rxpc_session *s);
static void _rxpc_session_root_stream_opened(struct rxpc_stream *stream);
@@ -85,12 +88,72 @@ static int _rxpc_session_cb_frame_recv(nghttp2_session *session, const nghttp2_f
static int _rxpc_session_cb_data_chunk_recv(nghttp2_session *session, uint8_t flags, int32_t stream_id,
const uint8_t *data, size_t len, void *user_data) {
size_t tlen;
struct rxpc_stream *stream = (struct rxpc_stream *)
nghttp2_session_get_stream_user_data(session, stream_id);
struct rxpc_msg_header *header = (struct rxpc_msg_header *) stream->recv_header_data;
#ifdef DEBUG_RAW_IO
printf("rxpc: stream %d: received data chunk\n", stream_id);
for (int i = 0; i < len; i++)
printf("%02x ", data[i]);
printf("\n");
#endif
while (len > 0) {
// Read the header if needed
if (stream->recv_header_pos < sizeof(struct rxpc_msg_header)) {
if (stream->recv_header_pos == 0 && len >= sizeof(struct rxpc_msg_header)) { // 99% of cases
*((struct rxpc_msg_header *) stream->recv_header_data) = *((struct rxpc_msg_header *) data);
data += sizeof(struct rxpc_msg_header);
len -= sizeof(struct rxpc_msg_header);
} else {
tlen = MIN(sizeof(struct rxpc_msg_header) - stream->recv_header_pos, len);
memcpy(&stream->recv_header_data[stream->recv_header_pos], data, tlen);
stream->recv_header_pos += tlen;
data += tlen;
len -= tlen;
if (stream->recv_header_pos < sizeof(struct rxpc_msg_header))
break; // Incomplete, not enough data to fill the remaining space
}
// Validate the header
if (header->magic != RXPC_MSG_MAGIC) {
fprintf(stderr, "rxpc: recv bad magic %" PRIx32, header->magic);
return -1;
}
if (header->length > 0x10000) {
fprintf(stderr, "rxpc: recv message too long: %" PRIu64 "\n", header->length);
return -1;
}
}
// Read the data
if (stream->recv_data_pos == 0 && len >= header->length) {
// No need to copy the data, it's all in the buffer
assert(stream->recv_data == NULL);
stream->cbs.message(stream, header, data);
data += header->length;
len -= header->length;
} else if (len > 0) {
if (!stream->recv_data)
stream->recv_data = malloc(header->length);
tlen = MIN(len, header->length - stream->recv_data_pos);
memcpy(&stream->recv_data[stream->recv_data_pos], header, tlen);
stream->recv_data_pos += tlen;
data += tlen;
len -= tlen;
if (stream->recv_data_pos != header->length)
break; // Incomplete
stream->cbs.message(stream, header, stream->recv_data);
}
// Message complete, prepare for next message
stream->recv_header_pos = 0;
if (stream->recv_data)
free(stream->recv_data);
stream->recv_data_pos = 0;
}
return 0;
}
+3 -1
View File
@@ -3,7 +3,6 @@
#include <malloc.h>
#include <string.h>
#include "session.h"
#include "proto.h"
#include <xpc/xpc_serialization.h>
static ssize_t _rxpc_stream_read(nghttp2_session *session, int32_t stream_id, uint8_t *buf, size_t length,
@@ -18,6 +17,9 @@ struct rxpc_stream *rxpc_stream_open(struct rxpc_session *session, struct rxpc_s
ret->send_data_end = NULL;
provider.source.ptr = ret;
provider.read_callback = _rxpc_stream_read;
ret->recv_header_pos = 0;
ret->recv_data_pos = 0;
ret->recv_data = NULL;
ret->id = nghttp2_submit_request(session->session, NULL, NULL, 0, &provider, ret);
if (ret->id <= 0) {
fprintf(stderr, "rxpc: failed to open new stream: %s\n", nghttp2_strerror(ret->id));
+8
View File
@@ -4,21 +4,29 @@
#include <stdint.h>
#include <stddef.h>
#include <xpc/xpc.h>
#include "proto.h"
struct rxpc_session;
struct rxpc_stream;
struct rxpc_stream_pending_data;
typedef void (*rxpc_stream_opened_cb)(struct rxpc_stream *);
typedef void (*rxpc_stream_message_cb)(struct rxpc_stream *, struct rxpc_msg_header *header, const void *data);
struct rxpc_stream_callbacks {
rxpc_stream_opened_cb opened;
rxpc_stream_message_cb message;
};
struct rxpc_stream {
struct rxpc_session *session;
int32_t id;
struct rxpc_stream_callbacks cbs;
struct rxpc_stream_pending_data *send_data_begin, *send_data_end;
char recv_header_data[sizeof(struct rxpc_msg_header)];
size_t recv_header_pos;
char *recv_data;
size_t recv_data_pos;
};
struct rxpc_stream_pending_data {
struct rxpc_stream_pending_data *next;