Где закрыть соединение/файл/журналы с несколькими потоками?

Предположим, что для простого сценария с двумя потоками используется следующий псевдокод:

У меня есть два потока, я хотел бы вставить данные в разные таблицы в базу данных. В потоке 1 я хотел бы вставить в какую-то таблицу, в то же время я хочу вставить другие данные в поток 2. Мой вопрос заключается в том, как/где разместить connection.close(), если я помещу это в потоке 1, и он выполняется, пока поток 2 все еще обрабатывается, или наоборот, если поток 2 завершил и закрыл соединение, а поток 1 еще не завершился.

Обратите внимание, что база данных — это всего лишь пример, это может быть что угодно, например файл, регистратор и т. д.

class Thread1{
    DataBaseConnection connection;
    main(){
        threadPool = Executors.newFixedThreadPool(1);
        connection.open();
        if(ThisMightTakeSomeTime)
        threadPool.submit(new MyRunnable(connection));
        InsertDataToDataBase(Table A, Table B));
        connection.Close(); //What if thread2 isn't done yet?
    }
}

public class MyRunnable implements Runnable {
    MyRunnable(connection){}
    @override
    void Run() { ...}
    void TaskThatMayTakeWhile(){
        ...get data ...
        ...Connection.InsertToTables(table X, table Y)
    }
}

person user1529412    schedule 03.03.2014    source источник
comment
Я не читал весь код, но почему вы не используете два разных подключения к базе данных из пула? Просто закрыть один, когда он будет готов, а другой закроется, когда это будет сделано?   -  person D-Klotz    schedule 03.03.2014
comment
Потоки должны выполняться в одной транзакции (один коммит)?   -  person robermann    schedule 03.03.2014
comment
Thread.join в потоке 1 в потоке 2 или использовать защелку (java.util.cocurrent.CountdownLatch), инициализированную в потоке 1 и выпущенную потоком 2?   -  person Bruno Grieder    schedule 03.03.2014
comment
@robermann, нет, я имел в виду, что мне нужно иметь возможность одновременно писать в базу данных из двух разных потоков, разных коммитов.   -  person user1529412    schedule 03.03.2014
comment
@ D-Koltz, база данных была просто примером, в моем реальном коде это на самом деле экземпляр индексатора для поисковой системы, такой как Solr.   -  person user1529412    schedule 03.03.2014
comment
Вы уверены, что Solr является потокобезопасным?   -  person D-Klotz    schedule 03.03.2014


Ответы (3)


Мой вопрос в том, как/где разместить connection.close(),

Для начала, насколько мне известно, вы не должны не использовать одно соединение с двумя разными потоками. Каждый поток должен иметь собственное подключение к базе данных, возможно, с использованием пула подключений к базе данных, такого как Apache DBCP.

Если у вас есть несколько подключений, я бы приказал каждому потоку управлять и выпускать свое собственное соединение обратно в пул. Вы должны убедиться, что это сделано в блоке finally, чтобы убедиться, что если есть исключение базы данных, соединение все равно будет разорвано.

Если вы вынуждены иметь несколько потоков, использующих одно и то же соединение, им придется использовать synchronized, чтобы убедиться, что у них есть эксклюзивная блокировка:

 synchronized (connection) {
    // use the connection
 }

Что касается того, когда его закрыть, если он является общим, вы можете иметь общий счетчик использования (возможно, AtomicInteger) и закрыть его, когда счетчик становится равным 0. Или, как рекомендовали другие, вы можете использовать пул потоков, а затем пул потоков делается бесплатное соединение.

Обратите внимание, что база данных — это всего лишь пример, это может быть что угодно, например файл, регистратор и т. д.

Что касается более общего ответа, я всегда стараюсь отразить, где создается вещь. Если метод открывает поток, он должен иметь finally, закрывающий поток.

 public void someMethod() {
    InputStream stream = ...
    try {
         // process the stream here probably by calling other methods
    } finally {
         // stream should be closed in the same method for parity
         stream.close();
    }
 }

Исключением из этого шаблона является обработчик потока. Затем Thread должен закрыть поток или разорвать соединение в блоке finally в конце метода run() или call().

 public void serverLoopMethod() {
     while (weAcceptConnections) {
        Connection connection = accept(...);
        threadPool.submit(new ConnectionHandler(connection);
     }
 }
 ...
 private static class ConnectionHandler implements Runnable {
     private Connection connection;
     public ConnectionHandler(Connection connection) {
         this.connection = connection;
     }
     // run (or call) method executed in another thread
     public void run() {
         try {
            // work with the connection probably by calling other methods
         } finally {
              // connection is closed at the end of the thread run method
              connection.close();
         }
     }
 }
person Gray    schedule 03.03.2014
comment
Спасибо за отличные ответы. Я использовал базу данных в качестве примера, чтобы сделать его простым. Что, если соединение является одноэлементным экземпляром, и я не могу создать еще один? - person user1529412; 03.03.2014
comment
Затем вам придется synchronized блокировать его в каждом потоке @user1529412, когда вы его используете. Что касается того, когда его закрыть, у вас может быть общий счетчик использования (возможно, AtomicInteger) и закрыть его, когда счетчик становится равным 0. Или, как рекомендовали другие, вы можете использовать пул потоков, а затем пул потоков освобождается от соединения. - person Gray; 03.03.2014

Если вы запустите свой код, вполне вероятно, что соединение с базой данных будет закрыто до выполнения оператора insert и, конечно же, insert будет неудачным.

Правильные решения Если у вас есть несколько задач вставки:

  1. Используйте ExecutorService вместо Execuutor
  2. Отправить все задачи
  3. Вызовите executorService.shutdown(), он будет ждать, пока все отправленные задачи не будут выполнены.
  4. Закрыть соединение

Если вам нужно отправить только одну задачу:
вам следует закрыть соединение после Connection.InsertToTables(table X, table Y) в вашей задаче.

Подходит для обоих сценариев и рекомендуется:
у каждой задачи есть свои connection.

Пример:

class Thread1 {
        private static DataSource dataSource; // initialize it

        public static void main(String[] args){
            ExecutorService threadPool = Executors.newFixedThreadPool(1);
            threadPool.submit(new MyRunnable(dataSource));                
        }
}

class MyRunnable implements Runnable {
        private final DataSource dataSource;

    MyRunnable(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public void run() {
        Connection connection = dataSource.getConnection();
        // do something with connection
        connection.close();
    }
}
person m-szalik    schedule 03.03.2014
comment
Спасибо за ответ, я выбрал ответ Грея как правильный, потому что он немного глубже, хотя ваш ответ тоже правильный. Я проголосовал за тебя. - person user1529412; 03.03.2014

class Thread1{
    DataBaseConnection connection;
    main(){
        threadPool = Executors.newFixedThreadPool(1);
        connection.open();
        if(ThisMightTakeSomeTime)
        Future f = threadPool.submit(new MyRunnable(connection));
        InsertDataToDataBase(Table A, Table B));
        f.get(); // this will hold the program until the Thread finishes.
        connection.Close(); //What if thread2 isn't done yet?
    }
}

Будущее — это ссылка, полученная в результате вызова submit. если мы вызовем Future.get(), это заблокирует текущий поток, пока отправленный поток не завершится.

person Chamil    schedule 03.03.2014