Java - запись вызова блокировки выходного потока сокета в полнодуплексном режиме

Я пишу клиент-серверное приложение, и я хочу читать и писать в один сокет из двух разных потоков (один поток для чтения, один для записи). У меня система почти работает, но есть одна озадачивающая ошибка, которую я никак не могу понять. Чтение и запись работают совершенно независимо друг от друга, но когда я начинаю читать из Socket из OutputStream в одном потоке, все вызовы для записи в InputStream в другом потоке блокируются на неопределенный срок.

Я написал небольшую тестовую программу, чтобы быстро воспроизвести проблему и исключить как можно больше внешних переменных. Я использую ServerSocketChannel и SocketChannel java.nio для установки соединения, и я использую Socket java.io (основной сокет SocketChannel) для простоты использования с ObjectInputStream и ObjectOutputStream. Программа тестирования рассчитана на двойной запуск; при первом запуске пользователь вводит s для запуска сервера, а при втором запуске пользователь вводит c для запуска клиента.

Мой вопрос: Почему выполнение приведенной ниже программы блокируется при втором вызове objectOutput.writeObject( message ); в методе server()? (четвертая до последней строки в этом методе)

Я включил ожидаемые результаты и фактические результаты, а также то, что я думаю, что они означают ниже программного кода.

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Main {

    private static final String IP_ADDRESS = "localhost";
    private static final int WELL_KNOWN_PORT = 4000;

    public static void main( String... args ) throws Exception {
        Scanner scanner = new Scanner( System.in );
        System.out.print( "choose (s)erver or (c)lient: " );
        char choice = scanner.nextLine().charAt( 0 );
        switch ( choice ) {
        case 's':
            server();
            break;
        case 'c':
            client();
            break;
        default:
            break;
        }
        scanner.close();
    }

    private static void server() throws Exception {

        // initialize connection

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind( new InetSocketAddress( WELL_KNOWN_PORT ) );
        System.out.println( "waiting for client to connect" );
        SocketChannel socketChannel = serverSocketChannel.accept();
        System.out.println( "client connected" );
        socketChannel.configureBlocking( true );
        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );
        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );

        // write first object to stream

        Message message = new Message( 1 );
        System.out.println( "writing first object to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "first object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );

        // start reading in a separate thread

        new Thread( () -> {
            ObjectInput objectInput = null;
            try {
                objectInput = new ObjectInputStream( socket.getInputStream() );
            } catch ( IOException e ) {
                e.printStackTrace();
            }
            Message messageIn = null;
            try {
                System.out.println( "reading on object input stream" );
                messageIn = (Message) objectInput.readObject();
                System.out.println( "read object on object input stream: " + messageIn );
            } catch ( ClassNotFoundException | IOException e ) {
                e.printStackTrace();
            }
            System.out.println( messageIn );
        } ).start();
        Thread.sleep( 100 ); // allow time for object listening to start

        // write second object to stream

        message = new Message( 2 );
        System.out.println( "writing second object to object output stream: " + message );
        objectOutput.writeObject( message ); // this call seems to block??
        System.out.println( "second object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

    private static void client() throws Exception {

        // initialize connection

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking( true );
        socketChannel.connect( new InetSocketAddress( IP_ADDRESS, WELL_KNOWN_PORT ) );
        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );
        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );
        ObjectInput objectInput = new ObjectInputStream( socket.getInputStream() );

        // read first object

        System.out.println( "reading first object on object input stream" );
        Message message = (Message) objectInput.readObject();
        System.out.println( "read first object on object input stream: " + message );

        // read second object

        System.out.println( "reading second object on object input stream" );
        message = (Message) objectInput.readObject();
        System.out.println( "read second object on object input stream: " + message );

        // write confirmation message

        message = new Message( 42 );
        System.out.println( "writing confirmation message to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "confirmation message written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

    private static class Message implements Serializable {

        private static final long serialVersionUID = 5649798518404142034L;
        private int data;

        public Message( int data ) {
            this.data = data;
        }

        @Override
        public String toString() {
            return "" + data;
        }
    }
}

Сервер

Ожидаемый результат:

choose (s)erver or (c)lient: s
waiting for client to connect
client connected
writing first object to object output stream: 1
first object written to object output stream
object output stream flushed
reading on object input stream
writing second object to object output stream: 2
second object written to object output stream
object output stream flushed
read object on object input stream: 42

Фактический результат:

choose (s)erver or (c)lient: s
waiting for client to connect
client connected
writing first object to object output stream: 1
first object written to object output stream
object output stream flushed
reading on object input stream
writing second object to object output stream: 2

Приложение успешно отправляет первый объект, но блокирует второй на неопределенный срок. Единственная разница, которую я вижу, заключается в том, что второй вызов записи происходит, когда операция чтения выполняется в отдельном потоке. Сначала я подумал, что, возможно, Sockets не поддерживают одновременное чтение и запись из разных потоков, но мой поиск в Stack Overflow показывает, что они поддерживают эту одновременную операцию (полный дуплекс). Это основная причина, по которой меня смущает работа приведенного выше кода.

Клиент

Ожидаемый результат:

choose (s)erver or (c)lient: c
reading on object input stream
read first object on object input stream: 1
reading second object on object input stream
read second object on object input stream: 2
writing confirmation message to object output stream: 42
confirmation message written to object output stream
object output stream flushed

Фактический результат:

choose (s)erver or (c)lient: c
reading first object on object input stream
read first object on object input stream: 1
reading second object on object input stream

Это подтверждает, что первый объект был успешно отправлен и получен клиентом. Клиент, кажется, ожидает второго объекта, который никогда не отправляется сервером из-за этого странного поведения блокировки на сервере.

Заранее большое спасибо за любой совет, который может дать кто угодно. Я готов переписать свой код, если полный дуплекс легко достижим другим способом, но если есть решение, использующее вышеуказанную структуру, я бы предпочел придерживаться этого для простоты, поскольку не нужно рефакторить большие участки кода.


person M Stefan Walker    schedule 04.02.2018    source источник
comment
Я использую Java 9 в Eclipse Oxygen 1A, если это полезно!   -  person M Stefan Walker    schedule 04.02.2018


Ответы (1)


В этом коде так много ошибок, что мне придется взять его построчно:

private static void server() throws Exception {

        // initialize connection

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind( new InetSocketAddress( WELL_KNOWN_PORT ) );
        System.out.println( "waiting for client to connect" );

        SocketChannel socketChannel = serverSocketChannel.accept();

Нет ничего выше того, что «инициализирует соединение». Клиент инициализирует соединение. Этот код принимает это.

        System.out.println( "client connected" );
        socketChannel.configureBlocking( true );

Это значение по умолчанию. Вам не нужно утверждать значения по умолчанию.

        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );

Вы не должны звонить сюда. finishConnect() предназначен для клиентов, вызвавших connect() в неблокирующем режиме. Вы сервер, вы не звонили connect() и не находитесь в неблокирующем режиме. И если вы являетесь клиентом в неблокирующем режиме, вы не должны вызывать его в цикле с спящими: вы должны использовать Selector.select() с OP_CONNECT.

        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );

Поскольку вы используете режим блокировки и потоки вывода, невозможно понять, почему вы вообще используете ServerSocketChannel и SocketChannel, и на самом деле это, по крайней мере, часть проблемы. Малоизвестный факт, что потоки, полученные из каналов NIO, используют синхронизацию в канале как для чтения, так и для записи, и поэтому они вообще не являются полнодуплексными, даже несмотря на то, что базовое TCP-соединение является таковым. Удалите все это и перепишите, используя ServerSocket и Socket.

        // write first object to stream

        Message message = new Message( 1 );
        System.out.println( "writing first object to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "first object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );

        // start reading in a separate thread

        new Thread( () -> {
            ObjectInput objectInput = null;
            try {
                objectInput = new ObjectInputStream( socket.getInputStream() );
            } catch ( IOException e ) {
                e.printStackTrace();
            }

Не пишите такой код. Код, подобный приведенному ниже, который зависит от успеха предыдущего блока try, подобного приведенному выше, должен находиться внутри этого блока try. В противном случае, например, следующий код может получить NullPointerExceptions.

            Message messageIn = null;
            try {
                System.out.println( "reading on object input stream" );
                messageIn = (Message) objectInput.readObject();
                System.out.println( "read object on object input stream: " + messageIn );
            } catch ( ClassNotFoundException | IOException e ) {
                e.printStackTrace();
            }

То же.

            System.out.println( messageIn );
        } ).start();
        Thread.sleep( 100 ); // allow time for object listening to start

        // write second object to stream

        message = new Message( 2 );
        System.out.println( "writing second object to object output stream: " + message );
        objectOutput.writeObject( message ); // this call seems to block??
        System.out.println( "second object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

См. выше, почему выполнение этого в отдельных потоках не может работать в случае потоков, полученных из каналов NIO.

    private static void client() throws Exception {

        // initialize connection

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking( true );
        socketChannel.connect( new InetSocketAddress( IP_ADDRESS, WELL_KNOWN_PORT ) );
        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );

Последние две строки выше бессмысленны, потому что соединение установлено, потому что вы находитесь в режиме блокировки.

        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );
        ObjectInput objectInput = new ObjectInputStream( socket.getInputStream() );

        // read first object

        System.out.println( "reading first object on object input stream" );
        Message message = (Message) objectInput.readObject();
        System.out.println( "read first object on object input stream: " + message );

        // read second object

        System.out.println( "reading second object on object input stream" );
        message = (Message) objectInput.readObject();
        System.out.println( "read second object on object input stream: " + message );

        // write confirmation message

        message = new Message( 42 );
        System.out.println( "writing confirmation message to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "confirmation message written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

Вы можете использовать все остальное как есть, но опять же, каналы NIO здесь бесполезны. Вы также можете использовать Socket.

person user207421    schedule 04.02.2018
comment
Спасибо за все отзывы. Все это имеет большой смысл. Изначально я начал использовать nio, потому что собирался записывать и читать байты вручную. После этого выбора я вернулся к простым сокетам для простоты использования с объектными потоками. Часть о потоках, поступающих из сокета, созданного с синхронизацией SocketChannel, — это именно то, что мне было нужно. Еще раз спасибо! - person M Stefan Walker; 04.02.2018
comment
Я могу подтвердить. Я внес все изменения, предложенные в вашем ответе, и мой код работает безупречно. Возможно, я когда-нибудь вернусь к nio за их селекторами и другими функциями, но для этого проекта в настоящее время это работает отлично. - person M Stefan Walker; 04.02.2018