Threadpool с постоянными рабочими экземплярами

Я пытаюсь поставить в очередь задачи в пуле потоков, которые будут выполняться, как только рабочий освободится, я нашел различные примеры этого, но во всех случаях примеры были настроены на использование нового экземпляра Worker для каждого задания, я нужны настойчивые работники.

Я пытаюсь создать инструмент резервного копирования ftp, он у меня работает, но из-за ограничений одного соединения он работает медленно. В идеале я хочу иметь одно соединение для сканирования каталогов и создания списка файлов, а затем четырех рабочих для загрузки указанных файлов.

Вот пример моего рабочего FTP:

public class Worker implements Runnable {
  protected FTPClient _ftp;

  // Connection details
  protected String _host = "";
  protected String _user = "";
  protected String _pass = "";

  // worker status
  protected boolean _working = false;

  public Worker(String host, String user, String pass) {
    this._host = host;
    this._user = user;
    this._pass = pass;
  }

   // Check if the worker is in use
  public boolean inUse() {
    return this._working;
  }

  @Override
  public void run() {
    this._ftp = new FTPClient();
    this._connect();
  }

  // Download a file from the ftp server
  public boolean download(String base, String path, String file) {
    this._working   = true;
    boolean outcome = true;

    //create directory if not exists
    File pathDir = new File(base + path);
    if (!pathDir.exists()) {
      pathDir.mkdirs();
    }

    //download file
    try {
      OutputStream output = new FileOutputStream(base + path + file);
      this._ftp.retrieveFile(file, output);
      output.close();
    } catch (Exception e) {
      outcome = false;
    } finally {
      this._working = false;
      return outcome;
    }
  }

  // Connect to the server
  protected boolean _connect() {
    try {
      this._ftp.connect(this._host);
      this._ftp.login(this._user, this._pass);
    } catch (Exception e) {
      return false;
    }
    return this._ftp.isConnected();
  }

  // Disconnect from the server
  protected void _disconnect() {
    try {
      this._ftp.disconnect();
    } catch (Exception e) { /* do nothing */ }
  }
}

Я хочу иметь возможность вызывать Worker.download(...) для каждой задачи в очереди всякий раз, когда рабочий становится доступным, без необходимости создавать новое подключение к ftp-серверу для каждой загрузки.

Любая помощь будет оценена по достоинству, так как я никогда раньше не использовал потоки, и в данный момент я хожу по кругу.


person Matt Indeedhat Holmes    schedule 17.10.2013    source источник
comment
Почему бы не иметь пул соединений? Таким образом, рабочие процессы не связаны с соединениями, они извлекают соединения из пула и используют их для возврата. Это обычный способ программирования с ограниченными внешними ресурсами, потому что воркеру не нужно соединение все время, всегда у вас может быть больше воркеров, чем соединений...   -  person Boris the Spider    schedule 17.10.2013
comment
Возможно, вы захотите использовать ExecutorService Java   -  person Simon Forsberg    schedule 17.10.2013
comment
@BoristheSpider Вы должны ответить на свой комментарий. Я думаю, что OP должен сочетать концепции ThreadPool и ConnectionPool.   -  person Fildor    schedule 17.10.2013
comment
@Matt, пожалуйста, помните, что это Java. Пожалуйста, используйте соглашения об именах java - не нужно начинать имена со знака подчеркивания.   -  person Boris the Spider    schedule 17.10.2013


Ответы (2)


примеры были настроены на использование нового экземпляра Worker для каждого задания, мне нужны постоянные работники.

Это общий вопрос с несколькими различными решениями. Что вам нужно, так это некоторый контекст для каждого потока, а не для каждого Runnable или Callable, который будет отправляться в ExecutorService.

Одним из вариантов было бы иметь ThreadLocal, который создал бы ваши ftp экземпляры. Это не оптимально, потому что не будет простого способа закрыть ftp-соединение, когда поток завершится. Затем вы ограничите количество подключений, ограничив количество потоков, работающих в вашем пуле потоков.

Я думаю, что лучшим решением было бы использовать ExecutorService только для разветвления ваших рабочих потоков. Для каждого работника введите в него BlockingQueue, который все они используют для удаления из очереди и выполнения задач, которые им необходимо выполнить. Это отдельно от очереди, используемой внутри ExecutorService. Затем вы должны добавить задачи в свою очередь, а не в саму ExecutorService.

private static final BlockingQueue<FtpTask> taskQueue
        = new ArrayBlockingQueue<FtpTask>();

Таким образом, ваш объект задачи будет иметь что-то вроде:

public static class FtpTask {
     String base;
     String path;
     String file;
}

Тогда метод run() в вашем классе Worker будет делать что-то вроде:

public void run() {
    // make our permanent ftp instance
    this._ftp = new FTPClient();
    // connect it for the life of this thread
    this._connect();
    try {
        // loop getting tasks until we are interrupted
        // could also use volatile boolean !shutdown
        while (!Thread.currentThread().isInterrupted()) {
            FtpTask task = taskQueue.take();
            // if you are using a poison pill
            if (task == SHUTDOWN_TASK) {
                break;
            }
            // do the download here
            download(task.base, task.path, task.file);
        }
    } finally {
        this._disconnect();
    }
}

Опять же, вы ограничиваете количество подключений, ограничивая количество потоков, работающих в вашем пуле потоков.

В идеале я хочу иметь одно соединение для сканирования каталогов и создания списка файлов, а затем четырех рабочих для загрузки указанных файлов.

Я бы добавил Executors.newFixedThreadPool(5); и добавил один поток, который выполняет сканирование/сборку, и 4 рабочих потока, которые выполняют загрузку. Сканирующий поток помещается в BlockingQueue, в то время как рабочие потоки берутся из той же очереди.

person Gray    schedule 17.10.2013
comment
Чего мне не хватает в обоих ответах, так это требования ОП иметь ограниченное количество подключений, о чем Борис Паук говорит в своем комментарии. - person Fildor; 17.10.2013
comment
Ограниченное количество подключений исходит из количества Worker потоков, которые он разветвил. Я добавил раздел, посвященный этому @Fildor. - person Gray; 17.10.2013
comment
мне пришлось немного почитать, чтобы понять это, но я просто более или менее взломал ваш метод в своем коде, и он работает как мечта! Теперь, чтобы сделать чистую версию, которая не вызывает слезы игрушечных глаз! Спасибо - person Matt Indeedhat Holmes; 17.10.2013

Я бы предложил использовать ThreadPooleexecutor с размером ядра и максимальным размером пула в соответствии с требованиями. В этом случае также используйте связанную блокирующую очередь, которая будет выполнять ваши задачи в ней в порядке FIFO.

Как только поток (рабочий) освободится, задача будет выбрана из очереди и выполнена.

Ознакомьтесь с подробностями ThreadPoolExecutor. Дайте мне знать, если вы застряли где-нибудь в реализации ThreadPoolexecutor.

person Scientist    schedule 17.10.2013
comment
Согласен, ThreadPoolExecutor — правильный путь. Этот учебник является хорошей отправной точкой. vogella.com/articles/JavaConcurrency/article.html - person beyonddc; 17.10.2013
comment
Пожалуйста, проголосуйте/выберите в качестве ответа, если вы считаете, что сообщение выше было полезным. - person Scientist; 17.10.2013
comment
Проблема в том, что каждая задача, отправленная в TPE, будет устанавливать новое соединение, верно? Как OP может, например, сделать 100 загрузок, используя одно соединение? - person Gray; 17.10.2013