Синхронизировать в DataOutputStream

Я просмотрел так много руководств по синхронизации, что у меня голова идет кругом. Я никогда не понимал этого :(.

У меня есть сервер Java (MainServer), который при подключении клиента создает новый поток (ServerThread) с DataOutputStream.

Клиент общается с ServerThread, и ServerThread отвечает. Время от времени MainServer будет рассылать сообщение всем клиентам, используя каждый объект DataOutputStream ServerThread.

Я совершенно уверен, что время от времени моя проблема связана с тем, что и MainServer, и ServerThread пытаются что-то отправить клиенту одновременно. Поэтому мне нужно заблокировать объект DataOutputStream. Для моей жизни я не могу понять эту концепцию дальше. Каждый пример, который я читал, сбивает с толку.

Каков правильный способ справиться с этим?

Метод отправки клиенту ServerThread:

public void replyToOne(String reply){
    try {
        commandOut.writeUTF(reply);
        commandOut.flush();
    } catch (IOException e) {
        logger.fatal("replyToOne", e);
    }
    logger.info(reply);
}

Метод распространения MainServer всем клиентам:

public static void distribute(String broadcastMessage){
    for (Map.Entry<String, Object[]> entry : AccountInfoList.entrySet()) {
        Object[] tmpObjArray = entry.getValue();
        DataOutputStream temporaryCOut = (DataOutputStream) tmpObjArray[INT_COMMAND_OUT]; //can be grabbed while thread is using it
        try {
            temporaryCOut.writeUTF(broadcastMessage);
            temporaryCOut.flush();
        } catch (IOException e) {
            logger.error("distribute: writeUTF", e);
        }
        logger.info(broadcastMessage);  
    }
}

Я думаю, у меня должно быть что-то подобное в моем классе ServerThread.

public synchronized DataOutputStream getCommandOut(){
    return commandOut;
}

Это действительно так просто? Я знаю, что об этом, вероятно, спрашивали и отвечали, но, похоже, я все еще не понимаю этого без индивидуальной помощи.


person KisnardOnline    schedule 09.10.2014    source источник
comment
Вот ваша проблема: Время от времени MainServer будет рассылать сообщение всем клиентам, используя каждый объект DataOutputStream ServerThread. .... ПОЧЕМУ? почему бы просто не вызвать метод экземпляра ServerThread?, а затем позволить ServerThread отправить сообщение клиенту в подходящее время....?   -  person rolfl    schedule 09.10.2014
comment
Согласитесь с @rolfl, я настоятельно рекомендую вам прочитать cs.unicam.it/ culmone/?download=java_concurrency_in_practice.pdf   -  person shazin    schedule 09.10.2014
comment
Так что это решило бы проблему, потому что ServerThread - это однопоточный доступ к DataOutputStream... хороший момент. Таскать меня за волосы здесь, неужели это так просто?   -  person KisnardOnline    schedule 09.10.2014
comment
@shazin спасибо, я хорошо прочитаю, надеюсь, это поможет мне действительно понять это. На данный момент с предложением rolfl логично избегать многопоточного доступа, но я уверен, что мне нужно знать об этом на будущее. спасибо вам обоим за помощь.   -  person KisnardOnline    schedule 09.10.2014


Ответы (3)


Если бы это был я.....

У меня будет LinkedBlockingQueue для каждого поток на стороне клиента. Затем каждый раз, когда у клиентского потока возникает момент бездействия в сокете, он проверяет очередь. Если есть сообщение для отправки из очереди, оно отправляется.

Затем сервер, если это необходимо, может просто добавить элементы в эту очередь, и, когда в соединении появится свободное место, оно будет отправлено.

Добавьте очередь, используйте метод ServerThread, например:

addBroadcastMessage(MyData data) {
    broadcastQueue.add(data);
}

а затем, на стороне сокета, есть цикл с блоком тайм-аута, чтобы он вырывался из сокета, если он простаивает, а затем просто:

while (!broadcastQueue.isEmpty()) {
    MyData data = broadcastQueue.poll();
    .... send the data....
}

и вы сделали.

LinkedBlockingQueue будет управлять блокировкой и синхронизацией для вас.

person rolfl    schedule 09.10.2014
comment
Отлично, уже переключил некоторые проблемы на это и выборочно проверил мою работу... работает как шарм. Собираюсь закончить остальное. Упростит отладку в будущем. Отправка всего из «ServerThread», назначенного этому клиенту, упростит его. Еще раз большое спасибо за помощь! - person KisnardOnline; 09.10.2014
comment
тьфу, в конце концов, это не сработает... моя команда ServerThread's commandIn.readUTF() будет заблокирована, поэтому, даже если в очередь будут добавлены элементы, она будет сначала ждать сообщений запроса клиента, прежде чем она истощит свои сообщения очереди в верхней части цикла . Есть ли способ сделать так, чтобы добавление сообщения в очередь разблокировало commandIn.readUTF()? - person KisnardOnline; 09.10.2014

Ты на правильном пути.

Каждое выражение, изменяющее DataOutputStream, должно быть synchronized в этом DataOutputStream, чтобы к нему не осуществлялся одновременный доступ (и, следовательно, не было каких-либо одновременных изменений):

public void replyToOne(String reply){
    try {
        synchronized(commandOut) {    // writing block
            commandOut.writeUTF(reply);
            commandOut.flush();
        }
    } catch (IOException e) {
        logger.fatal("replyToOne", e);
    }
    logger.info(reply);
}

И:

public static void distribute(String broadcastMessage){
    for (Map.Entry<String, Object[]> entry : AccountInfoList.entrySet()) {
        Object[] tmpObjArray = entry.getValue();
        DataOutputStream temporaryCOut = (DataOutputStream) tmpObjArray[INT_COMMAND_OUT]; //can be grabbed while thread is using it
        try {
            synchronized(temporaryCOut) {  // writing block
                temporaryCOut.writeUTF(broadcastMessage);
                temporaryCOut.flush();
            }
        } catch (IOException e) {
            logger.error("distribute: writeUTF", e);
        }
    logger.info(broadcastMessage);  
    }
}
person Jean Logeart    schedule 09.10.2014
comment
Жан, большое спасибо за понимание. Я действительно ценю это. Это помогает мне чувствовать себя немного менее потерянным. Чтобы сделать его как можно более простым, я собираюсь избегать многопоточности и просто использовать решение с очередью, поскольку в любом случае у меня есть 1 поток на каждого клиента. Тем не менее, это все еще заставляет меня чувствовать себя более уверенно в моей хватке. :) - person KisnardOnline; 09.10.2014

Просто вставлю свои 2 цента:

Я реализую серверы следующим образом:

Каждый сервер представляет собой поток, выполняющий только одну задачу: прослушивание соединений. Как только он распознает соединение, он создает новый поток для обработки ввода/вывода соединения (я называю этот подкласс ClientHandler).

Сервер также хранит список всех подключенных клиентов.

ClientHandlers отвечают за взаимодействие пользователя с сервером. Отсюда все довольно просто:

Отказ от ответственности: здесь нет блоков try-catches! добавьте их самостоятельно. Конечно, вы можете использовать исполнители потоков, чтобы ограничить количество одновременных подключений.

Метод сервера run():

@Override
public void run(){
 isRunning = true;
 while(isRunning){
  ClientHandler ch = new ClientHandler(serversocket.accept());
  clients.add(ch);
  ch.start();
 }
}

Ctor ClientHandler:

public ClientHandler(Socket client){
 out = new ObjectOutputStream(client.getOutputStream());
 in = new ObjectInputStream(client.getInputStream());
}

Метод run() ClientHandler:

@Override
public void run(){
 isConnected = true;
 while(isConnected){
  handle(in.readObject());
 }
}

и handle() метод:

private void handle(Object o){
 //Your implementation
}

Если вы хотите, чтобы для вывода использовался единый канал, вам придется синхронизировать его в соответствии с инструкциями, чтобы избежать непредвиденных результатов.

Есть 2 простых способа сделать это:

  1. Оберните каждый вызов для вывода в синхронизированный (этот) блок
  2. Используйте геттер для вывода (как и вы) с ключевым словом synchronized.
person Muli Yulzary    schedule 09.10.2014
comment
Эта установка почти идентична моей. Я просто упростил для целей моего вопроса. MyServer (мозги с потоком connectionHandler), каждое соединение порождает поток MyCommandHandler с потоками ввода/вывода для клиента. Когда MyServer и MyCommandHandler конкурировали за один и тот же DataOutputStream, у меня возникли проблемы, теперь с синхронизацией вызовов WriteUTF(), надеюсь, я больше этого не увижу. Цените свои 2 цента (положительное подтверждение)!! :D - person KisnardOnline; 09.10.2014