Как присоединиться к более чем 2 регионам с помощью Apache Geode?

Я пытался запросить некоторые регионы, и мне не удалось присоединиться более чем к двум из них. Я настроил это в тесте Java, чтобы их было легче запускать, но в пульсе все равно не получается.

@Test
public void test_geode_join() throws QueryException {
    ClientCache cache = new ClientCacheFactory()
        .addPoolLocator(HOST, LOCATOR_PORT)
        .setPoolSubscriptionEnabled(true)
        .setPdxSerializer(new MyReflectionBasedAutoSerializer())
        .create();

    {
        @SuppressWarnings("unchecked")
        SelectResults<StructImpl> r = (SelectResults<StructImpl>) cache.getQueryService()
            .newQuery("SELECT itm.itemId, bx.boxId " +
                "FROM /items itm, /boxs bx " +
                "WHERE itm.boxId = bx.boxId " +
                "LIMIT 5")
            .execute();

        for (StructImpl v : r) {
            System.out.println(v);
        }
    }

    {
        @SuppressWarnings("unchecked")
        SelectResults<StructImpl> r = (SelectResults<StructImpl>) cache.getQueryService()
            .newQuery("SELECT bx.boxId, rm.roomId " +
                "FROM /boxs bx, /rooms rm " +
                "WHERE bx.roomId = rm.roomId " +
                "LIMIT 5")
            .execute();

        for (StructImpl v : r) {
            System.out.println(v);
        }
    }

    {
        // That fails
        @SuppressWarnings("unchecked")
        SelectResults<StructImpl> r = (SelectResults<StructImpl>) cache.getQueryService()
            .newQuery("SELECT itm.itemId, bx.boxId, rm.roomId " +
                "FROM /items itm, /boxs bx, /rooms rm " +
                "WHERE itm.boxId = bx.boxId " +
                "AND bx.roomId = rm.roomId " +
                "LIMIT 5")
            .execute();

        for (StructImpl v : r) {
            System.out.println(v);
        }
    }
}

Первые 2 запроса работают нормально и отвечают мгновенно, но последний запрос выполняется до тех пор, пока не истечет время ожидания. Я получаю следующие журналы

[warn 2018/02/06 17:33:17.155 CET <main> tid=0x1] Pool unexpected socket timed out on client connection=Pooled Connection to hostname:31902: Connection[hostname:31902]@1978504976)

[warn 2018/02/06 17:33:27.333 CET <main> tid=0x1] Pool unexpected socket timed out on client connection=Pooled Connection to hostname2:31902: Connection[hostname2:31902]@1620459733 attempt=2)

[warn 2018/02/06 17:33:37.588 CET <main> tid=0x1] Pool unexpected socket timed out on client connection=Pooled Connection to hostname3:31902: Connection[hostname3:31902]@422409467 attempt=3)

[warn 2018/02/06 17:33:37.825 CET <main> tid=0x1] Pool unexpected socket timed out on client connection=Pooled Connection to hostname3:31902: Connection[hostname3:31902]@422409467 attempt=3). Server unreachable: could not connect after 3 attempts

[info 2018/02/06 17:33:37.840 CET <Distributed system shutdown hook> tid=0xd] VM is exiting - shutting down distributed system

[info 2018/02/06 17:33:37.840 CET <Distributed system shutdown hook> tid=0xd] GemFireCache[id = 1839168128; isClosing = true; isShutDownAll = false; created = Tue Feb 06 17:33:05 CET 2018; server = false; copyOnRead = false; lockLease = 120; lockTimeout = 60]: Now closing.

[info 2018/02/06 17:33:37.887 CET <Distributed system shutdown hook> tid=0xd] Destroying connection pool DEFAULT

И это заканчивается сбоем

org.apache.geode.cache.client.ServerConnectivityException: Pool unexpected socket timed out on client connection=Pooled Connection to hostname3:31902: Connection[hostname3:31902]@422409467 attempt=3). Server unreachable: could not connect after 3 attempts
    at org.apache.geode.cache.client.internal.OpExecutorImpl.handleException(OpExecutorImpl.java:798)
    at org.apache.geode.cache.client.internal.OpExecutorImpl.handleException(OpExecutorImpl.java:623)
    at org.apache.geode.cache.client.internal.OpExecutorImpl.execute(OpExecutorImpl.java:174)
    at org.apache.geode.cache.client.internal.OpExecutorImpl.execute(OpExecutorImpl.java:115)
    at org.apache.geode.cache.client.internal.PoolImpl.execute(PoolImpl.java:763)
    at org.apache.geode.cache.client.internal.QueryOp.execute(QueryOp.java:58)
    at org.apache.geode.cache.client.internal.ServerProxy.query(ServerProxy.java:70)
    at org.apache.geode.cache.query.internal.DefaultQuery.executeOnServer(DefaultQuery.java:456)
    at org.apache.geode.cache.query.internal.DefaultQuery.execute(DefaultQuery.java:338)
    at org.apache.geode.cache.query.internal.DefaultQuery.execute(DefaultQuery.java:319)
    at local.test.geode.GeodeTest.test_geode_join(GeodeTest.java:226)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)

Я пытался установить тайм-ауты на 60 секунд, но я все еще не получаю никаких результатов.

Все регионы настроены так:

 Type  |      Name       | Value
------ | --------------- | --------------------
Region | data-policy     | PERSISTENT_REPLICATE
       | disk-store-name | regionDiskStore1
       | size            | 1173
       | scope           | distributed-ack

Я что-то пропустил здесь?


person Crystark    schedule 06.02.2018    source источник


Ответы (1)


Судя по всей предоставленной информации, вы все делаете правильно. Я попытался воспроизвести в простом тесте (аналогичный тест), и тест возвращает 5 результатов. Однако, если один из предикатов не совпадает, это может привести к тому, что запрос займет намного больше времени, чтобы объединить достаточное количество строк, чтобы найти соответствующий кортеж.

Ниже приведен пример теста, в котором нет проблем, но если я изменю тест, чтобы поместить в регион 3 только портфели с идентификатором = -1. Затем тест «зависает», пытаясь найти 5 строк, соответствующих критериям поиска (он должен соединить 1000 * 1000 * 1000 строк, что занимает некоторое время). В конце концов запрос не найдет p3.ID = p1.ID. Возможно ли, что itm.boxIds просто не совпадают с box.boxId достаточно часто, поэтому поиск подходящих занимает намного больше времени?

public void testJoinMultipleReplicatePersistentRegionsWithLimitClause() throws Exception {

String regionName = "portfolio";
Cache cache = serverStarterRule.getCache();
assertNotNull(cache);
Region region1 =
    cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT).create(regionName + 1);
Region region2 =
    cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT).create(regionName + 2);
Region region3 =
    cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT).create(regionName + 3);

for ( int i = 0; i < 1000; i++) {
  Portfolio p = new Portfolio(i);
  region1.put(i, p);
  region2.put(i, p);
  region3.put(i, p);  //modify this line to region3.put(i, new Portfolio(-1)) to cause query to take longer
}

QueryService queryService = cache.getQueryService();

SelectResults results = (SelectResults) queryService
    .newQuery("select p1.ID, p2.ID, p3.ID from /portfolio1 p1, /portfolio2 p2, /portfolio3 p3 where p1.ID = p2.ID and p3.ID = p1.ID limit 5").execute();

assertEquals(5, results.size());
 }
person thejaseison    schedule 06.02.2018
comment
Спасибо за Ваш ответ. Не могли бы вы добавить, как вы создаете serverStarterRule? Я пытаюсь сделать больше тестов, но я получаю неустойчивое поведение. Также какой метод вы используете для регистрации надлежащего сериализатора для Portfolio? - person Crystark; 07.02.2018
comment
На самом деле, я, похоже, исправил свои проблемы, добавив правильные индексы. Также кажется, что случайные проблемы, которые у меня были, были связаны с тем, что тесты не очищали локальное состояние после каждого запуска, поэтому изменения, которые я делал, не всегда учитывались. - person Crystark; 07.02.2018