Перебор списков с использованием нескольких потоков

Следующий код (показана часть) создает объекты класса DescCalculator, вычисляет дескрипторы и возвращает их в виде строковых массивов. Молекула и ArrayList объектов Descriptor передаются.

    private void calcDesc()
    {
        try
        {
        StatusPanel.setStatus("Calculating Molecular Descriptors Using CDK...\n");
        File df = new File(Settings.getCurrentDirectory() + sep + "molDesc.csv");
        FileWriter dfw = new FileWriter(df);
        LoadSDF lsdf1 = new LoadSDF(Settings.getCurrentDirectory() + sep + "marvin3D.sdf");
        List<IAtomContainer> mols3D = lsdf1.getCompounds();
        DescriptorEngine engine = new DescriptorEngine(DescriptorEngine.MOLECULAR);
        List<String> classNames = engine.getDescriptorClassNames();
        List<String> removeList = new ArrayList();
        removeList.add("org.openscience.cdk.qsar.descriptors.molecular.IPMolecularLearningDescriptor");
        classNames.removeAll(removeList);
        List<IDescriptor> instances = engine.instantiateDescriptors(classNames);
        engine.setDescriptorInstances(instances);

        List<String> headerItems = new ArrayList<String>();
        headerItems.add("CID");
        headerItems.add("MobCSA");
        for (IDescriptor descriptor : instances) {
            String[] names = descriptor.getDescriptorNames();
            headerItems.addAll(Arrays.asList(names));
        }
        ArrayList<IMolecularDescriptor> descriptors = new ArrayList();

        for (Object object : instances)
        {
        IMolecularDescriptor descriptor = (IMolecularDescriptor) object;
        String[] comps = descriptor.getSpecification().getSpecificationReference().split("#");
        descriptors.add(descriptor);
        }
        String headerLine = "";
        for (String header : headerItems) {
            headerLine = headerLine + header + ",";
        }

        dfw.append(headerLine+"\n");
        ExecutorService eservice = Executors.newFixedThreadPool(threads);
        CompletionService <List<String>> cservice = new ExecutorCompletionService <List<String>> (eservice);
        int k=0;
        for (IAtomContainer mol : mols3D)
        {
            DescCalculator dc = new DescCalculator(mol,descriptors);
            cservice.submit(dc);
            k=k+1;

        }
        for (int j=1 ; j<=k; j++)
        {
            StatusPanel.setStatus("Calculating Descriptors for Molecule "+j+"/"+compounds.size()+" Using "+threads+" Processors\n");
            List<String> dataItems = cservice.take().get();
                for (int i = 0; i < dataItems.size(); i++) {
                if (dataItems.get(i).equals("NaN")) {
                    dataItems.set(i, "NA");
                }
            }

            try {
                String dataLine = "";
                for (String data : dataItems) {
                    dataLine = dataLine + data + ",";
                }
                dfw.append(dataLine+"\n");
            } catch (Exception e) {
                System.out.println(e.toString());
            }
        }
 dfw.close();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

Внутри класса есть цикл for, который проходит по списку дескрипторов следующим образом (показана часть). Этот код запускает исключение параллельной модификации. Если я использую threads=1 или итерацию дескриптора внутри блока synchronized{}, код работает нормально, но я не получаю необходимой параллелизации. Как мне перебирать список внутри класса DesCalculator??

    public class DescCalculator implements Callable<List<String>>{

    private IAtomContainer mol = new Molecule();
    private ArrayList<IMolecularDescriptor> molDesc;

    DescCalculator(IAtomContainer mol_, ArrayList<IMolecularDescriptor> molDesc_)
    {
        this.mol = mol_;
        this.molDesc = molDesc_;
    }

    @Override
    public List<String> call() {
        List<String> dataItems = new ArrayList<String>();
        try
        {
            String title = (String) mol.getProperty("PUBCHEM_COMPOUND_CID");
            dataItems.add(title);
            //String csa = Double.toString(mobcalCSA.get(ind));
            String csa = "NA";
            dataItems.add(csa);
            int ndesc = 0;
            for (IMolecularDescriptor descriptor : molDesc) {
                descriptor.calculate(mol);
                DescriptorValue value = descriptor.calculate(mol);
                if (value.getException() != null) {
                    for (int i = 0; i < value.getNames().length; i++) {
                        dataItems.add("NA");
                    }
                    continue;
                }

                IDescriptorResult result = value.getValue();
                if (result instanceof DoubleResult) {
                    dataItems.add(String.valueOf(((DoubleResult) result).doubleValue()));
                } else if (result instanceof IntegerResult) {
                    dataItems.add(String.valueOf(((IntegerResult) result).intValue()));
                } else if (result instanceof DoubleArrayResult) {
                    for (int i = 0; i < ((DoubleArrayResult) result).length(); i++) {
                        dataItems.add(String.valueOf(((DoubleArrayResult) result).get(i)));
                    }
                } else if (result instanceof IntegerArrayResult) {
                    for (int i = 0; i < ((IntegerArrayResult) result).length(); i++) {
                        dataItems.add(String.valueOf(((IntegerArrayResult) result).get(i)));
                    }
                }

                ndesc++;
            } 

        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
        return dataItems;
    }

}

Печать трассировки стека

java.util.ConcurrentModificationException
    at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372)
    at java.util.AbstractList$Itr.next(AbstractList.java:343)
    at org.openscience.cdk.ChemObject.notifyChanged(ChemObject.java:187)
    at org.openscience.cdk.ChemObject.setFlag(ChemObject.java:375)
    at org.openscience.cdk.graph.PathTools.depthFirstTargetSearch(PathTools.java:168)
    at org.openscience.cdk.graph.PathTools.depthFirstTargetSearch(PathTools.java:177)
    at org.openscience.cdk.graph.PathTools.depthFirstTargetSearch(PathTools.java:177)
    at org.openscience.cdk.graph.SpanningTree.getRing(SpanningTree.java:185)
    at org.openscience.cdk.graph.SpanningTree.getCyclicFragmentsContainer(SpanningTree.java:221)
    at org.openscience.cdk.atomtype.CDKAtomTypeMatcher.getRing(CDKAtomTypeMatcher.java:912)
    at org.openscience.cdk.atomtype.CDKAtomTypeMatcher.perceiveNitrogens(CDKAtomTypeMatcher.java:730)
    at org.openscience.cdk.atomtype.CDKAtomTypeMatcher.findMatchingAtomType(CDKAtomTypeMatcher.java:117)
    at org.openscience.cdk.tools.manipulator.AtomContainerManipulator.percieveAtomTypesAndConfigureAtoms(AtomContainerManipulator.java:719)
    at org.openscience.cdk.smiles.smarts.SMARTSQueryTool.initializeMolecule(SMARTSQueryTool.java:435)
    at org.openscience.cdk.smiles.smarts.SMARTSQueryTool.matches(SMARTSQueryTool.java:214)
    at org.openscience.cdk.smiles.smarts.SMARTSQueryTool.matches(SMARTSQueryTool.java:189)
    at org.openscience.cdk.qsar.descriptors.molecular.AcidicGroupCountDescriptor.calculate(AcidicGroupCountDescriptor.java:135)
    at edu.uconn.pharmacy.molfind.DescCalculator.call(DescCalculator.java:48)
    at edu.uconn.pharmacy.molfind.DescCalculator.call(DescCalculator.java:25)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:680)

person lochi    schedule 20.03.2012    source источник
comment
Кажется, что код неполный. Вы где-нибудь изменяете коллекцию molDesc?   -  person Tudor    schedule 21.03.2012
comment
Каким образом вы меняете molDesc внутри call()? Нужно ли, чтобы другие потоки видели эти изменения? Самое простое решение - просто сделать копию списка в конструкторе.   -  person Affe    schedule 21.03.2012
comment
@ Тюдор кажется так. OP может использовать CopyOnWriteArrayList в качестве контейнера для molDesc. Но чтобы сказать наверняка, нужно знать, как часто обновляется molDesc и правильно ли вообще обновлять molDesc во время выполнения DescCalculator#call().   -  person Victor Sorokin    schedule 21.03.2012
comment
Я нигде не модифицирую molDesc. Я также пробовал CopyOnWriteArrayList..   -  person lochi    schedule 21.03.2012
comment
тогда вы сможете просто сделать this.molDesc = new ArrayList<Thingie>(molDesc_); в своем конструкторе. Весь код, выдающий исключение, будет полезен для понимания того, что вы на самом деле пытаетесь сделать.   -  person Affe    schedule 21.03.2012
comment
Добавлен весь код и трассировка стека печати   -  person lochi    schedule 21.03.2012
comment
Похоже, проблема в том, что сами объекты дескриптора небезопасны для одновременного использования. Возможно, вам следует создать новый набор отдельных объектов-дескрипторов для каждого рабочего потока вместо их повторного использования?   -  person Affe    schedule 21.03.2012
comment
Всем спасибо.. Аффе: Вы абсолютно правы.. Объекты-дескрипторы не являются потокобезопасными.. Создание их внутри класса DescCalculator решило проблему..   -  person lochi    schedule 21.03.2012


Ответы (3)


Вы не должны получить исключение, если только вы не изменяете Collection в то же время, когда вы проходите по нему. Часто это можно сделать в 1 потоке, выполнив удаление из Collection, по которому вы выполняете итерацию, например, в for.

Но в вашем случае, я думаю, вы где-то удаляете из списка molDesc, хотя я не вижу этого в предоставленном вами образце кода. Если вам нужно удалить записи из списка, вам придется использовать какой-то другой механизм для удаления. Вы не можете изменять одну и ту же коллекцию в нескольких потоках, если только она не является synchronized.

Пара других идей:

  • Теперь уверен, что это должен быть точно такой же список. Вы можете заставить каждый поток работать с копией списка.

    DescCalculator dc =
       new DescCalculator(mol, new ArrayList<IMolecularDescriptor>(descriptors));
    
  • Вы можете просто передать Collections.synchronizedList копию molDesc, хотя я не уверен, что вы этого хотите.

    List<IMolecularDescriptor> syncedList =
        Collections.synchronizedList(descriptors);
    ...
    DescCalculator dc = new DescCalculator(mol, syncedList);
    
  • Каждый поток может хранить список элементов, которые необходимо удалить. В конце вы можете использовать Future, когда поток собран, чтобы получить удаленный список элементов и удалить их из списка в конце.

person Gray    schedule 20.03.2012

Ваше поле molDesc является общим состоянием между двумя или более потоками, которые обращаются к методу call(). Как вы сказали, единственный способ избежать CME - это обернуть итерацию в блок synchronized() (что предотвратит распараллеливание) или использовать потокобезопасную структуру, такую ​​​​как CopyOnWriteArrayList, как упоминал Виктор.

person esroberts    schedule 20.03.2012
comment
Я попробовал CopyOnWriteArrayList. molDesc ArrayList никогда не изменяется в любом случае. - person lochi; 21.03.2012

Оказывается, проблема связана с объектами дескриптора, которые не являются потокобезопасными. Я благодарю Affe за указание на это! Генерация их внутри класса DescCalculator устранила проблему. Я благодарю всех за их вклад!

person lochi    schedule 21.03.2012