NIO SocketChannel говорит, что данных нет, когда они есть (или селектор не информирует меня)

У меня есть работающий клиент-серверный аппарат, который может успешно подключаться и отправлять сообщения друг другу с помощью NIO.

Сейчас меня смущает только то, как я должен продолжать чтение, когда socketChannel.read() возвращает ноль.

У меня есть протокол, который отправляет первые 4 байта в качестве ожидаемого количества входящих байтов. Даже с такой суммой я сталкиваюсь с потенциальной проблемой.

Для тестирования. Однако бывают случаи, когда я могу прочитать что-то вроде:

5 // Read 5 bytes when calling socketChannel.read()
0 // Read 0 bytes when calling socketChannel.read() immediately after

Когда я наткнулся на ноль, я предположил, что закончил чтение и должен ждать, пока ко мне не поступят дополнительные данные.

Однако, когда я это делаю, OP_READ, похоже, не срабатывает, когда я позже снова выполняю selectNow(). Я проверил ключ, и его readyops() иinterestops() установлены на 1 (что соответствует OP_READ), но он не хочет распознавать, что пришло время снова читать.

Я обнаружил, что если я продолжу цикл чтения, я могу получить что-то вроде:

5 // socketChannel.read()
0 // socketChannel.read()
7 // socketChannel.read() (Done since I have all my bytes)
0
0
0
...

Я запутался здесь, потому что это означает одно из:

  • Там нет данных, поэтому доступный 0 является законным, но затем, когда поступают остальные данные, селектор отказывается возвращать ключ с помощью selectNow()

  • Данные все есть, но почему-то возвращает 0 при чтении.

Должен ли я перерегистрировать канал после того, как selectNow() вернет его как активный ключ? (Хотя мне не приходилось переключаться с OP_CONNECT на OP_READ... так что, думаю, нет). Я чувствую, что слепое вращение в цикле опасно и приведет к трате циклов обработки.

Мне что, просто продолжать их опрашивать? Это сбивает меня с толку, когда на самом деле срабатывает OP_READ.


person Water    schedule 24.05.2015    source источник
comment
Очень любопытный. Вам точно не придется заново регистрироваться. Разместите код.   -  person user207421    schedule 25.05.2015
comment
@EJP Это была моя вина, что я не позвонил bb.clear(); между чтением. Я обновлю код рабочим примером и прокомментирую, где это было вызвано.   -  person Water    schedule 25.05.2015


Ответы (1)


Это произошло из-за ошибки с моей стороны, когда я не вызвал .clear() для считываемого байтового буфера. Это приводит к тому, что он возвращает 0 чтения, даже если данные были переданы.

Этот пример также может быть полезен людям, которые хотят увидеть, как работает простой клиент (хотя и с очень плохой обработкой исключений). Нет никакой гарантии, что это будет работать должным образом, и, вероятно, могут возникнуть проблемы, поскольку он был разработан для быстрого и грязного теста.

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Test {

    public static final int PORT = 22222;

    public static void main(String[] args) throws IOException {
        Thread s = new Thread(new Server());
        Thread c = new Thread(new Client());
        s.start();
        c.start();
    }
}

class Client implements Runnable {

    public Selector selector;

    public SocketChannel sc;

    public Client() throws IOException {
        selector = Selector.open();
    }

    @Override
    public void run() {
        try {
            sc = SocketChannel.open();
            sc.socket().setTcpNoDelay(true);
            sc.configureBlocking(false);
            SelectionKey k = sc.register(selector, SelectionKey.OP_CONNECT);
            boolean firstConnect = sc.connect(new InetSocketAddress("localhost", Test.PORT));
            if (firstConnect) {
                System.out.println("Connected on first connect, de-registering OP_CONNECT");
                k.interestOps(SelectionKey.OP_READ);
            }

            while (true) {
                int keys = selector.selectNow();
                if (keys > 0) {
                    for (SelectionKey key : selector.selectedKeys()) {
                        if (key.isConnectable()) {
                            boolean finishConnectResult = sc.finishConnect();
                            key.interestOps(SelectionKey.OP_READ);
                            System.out.println("Finished connection: " + finishConnectResult);
                        }

                        if (key.isReadable()) {
                            ByteBuffer bb = ByteBuffer.allocate(2);
                            int bytesRead = 0;
                            while ((bytesRead = sc.read(bb)) > 0) {
                                bb.flip();
                                System.out.println(bytesRead + " bytes read");
                                System.out.println(bb.get() + ", " + bb.get());
                                //bb.clear(); // If this is not commented, it will not be handled properly.
                            }
                            System.out.println("Last bytes read value = " + bytesRead);
                            System.exit(0);
                        }
                    }
                }

                Thread.sleep(5);
            }
        } catch (Exception e) { 
            e.printStackTrace();
            throw new RuntimeException();
        }
    }
}

class Server implements Runnable {

    public Selector selector;

    public SocketChannel sc;

    public Server() throws IOException {
        selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(Test.PORT));
        ssc.register(selector, SelectionKey.OP_ACCEPT);
    }

    @Override
    public void run() {
        boolean notSentData = true;
        try {
            while (true) {
                int keys = selector.selectNow();
                if (keys > 0) {
                    for (SelectionKey key : selector.selectedKeys()) {
                        if (key.isAcceptable()) {
                            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                            sc = ssc.accept();
                            if (sc != null) {
                                sc.configureBlocking(false);
                                sc.socket().setTcpNoDelay(true); // Required in my application
                                sc.register(selector, SelectionKey.OP_WRITE);
                                System.out.println("Server accepted connection");
                            } else {
                                System.out.println("Got null connection");
                            }
                        }
                    }
                }

                if (sc != null && notSentData) {
                    ByteBuffer bb = ByteBuffer.allocate(4);
                    bb.put(new byte[]{ 1, 2, 3, -1});
                    bb.flip();
                    int wrote = sc.write(bb);
                    System.out.println("Wrote " + wrote + " bytes");
                    notSentData = false;
                }

                Thread.sleep(5);
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException();
        }
    }
}
person Water    schedule 25.05.2015
comment
Вы должны звонить compact(), а не clear()., иначе вы потеряете данные. Каждый раз вы получаете из буфера только два байта: этот код выбросит остальные. - person user207421; 25.05.2015