From 949dcd4fc3f09d44e102b3319917108879278baa Mon Sep 17 00:00:00 2001 From: zhongtao Date: Fri, 1 Dec 2023 22:30:06 +1400 Subject: [PATCH 168/181] use a timeout epoll loop to ensure complete data reception Signed-off-by: zhongtao --- src/cmd/isulad-shim/process.c | 121 +++++++--------------------------- src/cmd/isulad-shim/process.h | 1 - 2 files changed, 24 insertions(+), 98 deletions(-) diff --git a/src/cmd/isulad-shim/process.c b/src/cmd/isulad-shim/process.c index 38b3a1a1..c205302f 100644 --- a/src/cmd/isulad-shim/process.c +++ b/src/cmd/isulad-shim/process.c @@ -162,6 +162,7 @@ out: static int sync_exit_cb(int fd, uint32_t events, void *cbdata, struct epoll_descr *descr) { + epoll_loop_del_handler(descr, fd); return EPOLL_LOOP_HANDLE_CLOSE; } @@ -213,23 +214,13 @@ static int stdout_cb(int fd, uint32_t events, void *cbdata, struct epoll_descr * int r_count = 0; int w_count = 0; - if (events & EPOLLHUP) { - return EPOLL_LOOP_HANDLE_CLOSE; - } - - if (!(events & EPOLLIN)) { - return EPOLL_LOOP_HANDLE_CONTINUE; - } - (void)memset(p->buf, 0, DEFAULT_IO_COPY_BUF); - if (p->block_read) { - r_count = read_nointr(fd, p->buf, DEFAULT_IO_COPY_BUF); - } else { - r_count = read(fd, p->buf, DEFAULT_IO_COPY_BUF); - } - if (r_count <= 0) { - return EPOLL_LOOP_HANDLE_CLOSE; + r_count = read_nointr(fd, p->buf, DEFAULT_IO_COPY_BUF); + if (r_count <= 0 ) { + epoll_loop_del_handler(descr, fd); + close(fd); + return EPOLL_LOOP_HANDLE_CONTINUE; } shim_write_container_log_file(p->terminal, STDID_OUT, p->buf, r_count); @@ -255,23 +246,13 @@ static int stderr_cb(int fd, uint32_t events, void *cbdata, struct epoll_descr * int r_count = 0; int w_count = 0; - if (events & EPOLLHUP) { - return EPOLL_LOOP_HANDLE_CLOSE; - } - - if (!(events & EPOLLIN)) { - return EPOLL_LOOP_HANDLE_CONTINUE; - } - (void)memset(p->buf, 0, DEFAULT_IO_COPY_BUF); - if (p->block_read) { - r_count = read_nointr(fd, p->buf, DEFAULT_IO_COPY_BUF); - } else { - r_count = read(fd, p->buf, DEFAULT_IO_COPY_BUF); - } - if (r_count <= 0) { - return EPOLL_LOOP_HANDLE_CLOSE; + r_count = read_nointr(fd, p->buf, DEFAULT_IO_COPY_BUF); + if (r_count <= 0 ) { + epoll_loop_del_handler(descr, fd); + close(fd); + return EPOLL_LOOP_HANDLE_CONTINUE; } shim_write_container_log_file(p->terminal, STDID_ERR, p->buf, r_count); @@ -297,18 +278,11 @@ static int resize_cb(int fd, uint32_t events, void *cbdata, struct epoll_descr * int r_count = 0; int resize_fd = -1; - if (events & EPOLLHUP) { - return EPOLL_LOOP_HANDLE_CLOSE; - } - - if (!(events & EPOLLIN)) { - return EPOLL_LOOP_HANDLE_CONTINUE; - } - (void)memset(p->buf, 0, DEFAULT_IO_COPY_BUF); r_count = read_nointr(fd, p->buf, DEFAULT_IO_COPY_BUF); if (r_count <= 0) { - return EPOLL_LOOP_HANDLE_CLOSE; + close(fd); + return EPOLL_LOOP_HANDLE_CONTINUE; } resize_fd = p->recv_fd; @@ -565,24 +539,6 @@ static int open_generic_io(process_t *p, struct epoll_descr *descr) return SHIM_OK; } -static int set_non_block(int fd) -{ - int flag = -1; - int ret = SHIM_ERR; - - flag = fcntl(fd, F_GETFL, 0); - if (flag < 0) { - return SHIM_ERR; - } - - ret = fcntl(fd, F_SETFL, flag | O_NONBLOCK); - if (ret != 0) { - return SHIM_ERR; - } - - return SHIM_OK; -} - /* std_id: channel type isulad_stdio: one side of the isulad fifo file @@ -602,8 +558,6 @@ static int set_non_block(int fd) static void *io_epoll_loop(void *data) { int ret = 0; - int fd_out = -1; - int fd_err = -1; process_t *p = (process_t *)data; struct epoll_descr descr; @@ -632,49 +586,23 @@ static void *io_epoll_loop(void *data) (void)sem_post(&p->sem_mainloop); + // th frist epoll_loop will exit in the following scenarios: + // 1. Receive sync fd event + // 2. stdin fd receive EPOLLHUP event + // 3. stdin fd read failed ret = epoll_loop(&descr, -1); if (ret != 0) { write_message(ERR_MSG, "epoll loop failed"); exit(EXIT_FAILURE); } - // in order to avoid data loss, set fd non-block and read it - p->block_read = false; - if (p->state->terminal) { - fd_out = p->recv_fd; - } else { - fd_out = p->shim_io->out; - fd_err = p->shim_io->err; - } - - if (fd_out > 0) { - ret = set_non_block(fd_out); - if (ret != SHIM_OK) { - write_message(ERR_MSG, "set fd %d non_block failed:%d", fd_out, SHIM_SYS_ERR(errno)); - exit(EXIT_FAILURE); - } - - for (;;) { - ret = stdout_cb(fd_out, EPOLLIN, p, &descr); - if (ret == EPOLL_LOOP_HANDLE_CLOSE) { - break; - } - } - } - - if (fd_err > 0) { - ret = set_non_block(fd_err); - if (ret != SHIM_OK) { - write_message(ERR_MSG, "set fd %d non_block failed:%d", fd_err, SHIM_SYS_ERR(errno)); - exit(EXIT_FAILURE); - } - - for (;;) { - ret = stderr_cb(fd_err, EPOLLIN, p, &descr); - if (ret == EPOLL_LOOP_HANDLE_CLOSE) { - break; - } - } + // use a timeout epoll loop to ensure complete data reception + // th second epoll_loop will exit in the following scenarios: + // 1. both stdout fd and stderr fd failed to read + // 2. no event received within 3000 milliseconds + ret = epoll_loop(&descr, 3000); + if (ret != 0) { + write_message(ERR_MSG, "Repeat the epoll loop to ensure that all data is transferred"); } return NULL; @@ -850,7 +778,6 @@ process_t *new_process(char *id, char *bundle, char *runtime) p->bundle = bundle; p->runtime = runtime; p->state = p_state; - p->block_read = true; p->console_sock_path = NULL; p->exit_fd = -1; p->io_loop_fd = -1; diff --git a/src/cmd/isulad-shim/process.h b/src/cmd/isulad-shim/process.h index 93e01e7b..29e032b0 100644 --- a/src/cmd/isulad-shim/process.h +++ b/src/cmd/isulad-shim/process.h @@ -51,7 +51,6 @@ typedef struct process { int sync_fd; int listen_fd; int recv_fd; - bool block_read; log_terminal *terminal; stdio_t *stdio; // shim to on runtime side, in:r out/err: w stdio_t *shim_io; // shim io on isulad side, in: w out/err: r -- 2.42.0