Как передать traceIds в Hystrix Observables?

У меня есть несколько сервисов, некоторые из которых используют HystrixObservableCommand от Hystrix для вызова других сервисов, а другие используют HystrixCommand. Как передать идентификаторы трассировки из вызывающей службы в Observables в HystrixObservableCommand, а также передать их, если вызывается резервный вариант?

Все службы используют grpc-java.

Пример кода, который у меня есть:

WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);
        String messageFromWorldService = "";
        String idFromWorldService = "";
        try {

            Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get();
            messageFromWorldService = greeterReply.getMessage();
            idFromWorldService = greeterReply.getId();
            logger.info("Response from WorldService  -- {}, id = {}", messageFromWorldService, idFromWorldService);
        } catch (StatusRuntimeException | InterruptedException | ExecutionException e) {
            logger.warn("Exception when calling WorldService\n" +  e);
        }

WorldCommand.java

public class WorldCommand extends HystrixObservableCommand<Greeter.GreeterReply> {

    private static final Logger logger = LoggerFactory.getLogger(WorldCommand.class.getName());

    private final Greeter.GreeterRequest greeterRequest;
    private final WorldServiceGrpc.WorldServiceStub worldServiceStub;

    public WorldCommand(Greeter.GreeterRequest greeterRequest, WorldServiceGrpc.WorldServiceStub worldServiceStub) {
        super(HystrixCommandGroupKey.Factory.asKey("WorldService"));
        this.greeterRequest = greeterRequest;
        this.worldServiceStub = worldServiceStub;
    }

    @Override
    protected Observable<Greeter.GreeterReply> construct() {
        Context context = Context.current();
        return Observable.create(new Observable.OnSubscribe<Greeter.GreeterReply>() {
            @Override
            public void call(Subscriber<? super Greeter.GreeterReply> observer) {
                logger.info("In WorldCommand");
                if (!observer.isUnsubscribed()) {
                    //pass on the context, if you want only certain headers to pass on then create a new Context and attach it.
                    context.attach();
                    logger.info("In WorldCommand after attach");
                    worldServiceStub.greetWithHelloOrWorld(greeterRequest, new StreamObserver<Greeter.GreeterReply>() {
                        @Override
                        public void onNext(Greeter.GreeterReply greeterReply) {
                            logger.info("Response from WorldService  -- {}, id = {}", greeterReply.getMessage(), greeterReply.getId());
                            observer.onNext(greeterReply);
                            observer.onCompleted();
                        }

                        @Override
                        public void onError(Throwable t) {
                            logger.info("Exception from WorldService  -- {}", t);
                        }

                        @Override
                        public void onCompleted() {

                        }
                    });
                }
            }
        } ).subscribeOn(Schedulers.io());
    }

    @Override
    protected Observable<Greeter.GreeterReply> resumeWithFallback() {
        logger.info("Response from fallback");
        Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("teammate").setId("-1").build();
        return Observable.just(greeterReply);
    }

Я использую трассировку Zipkin grpc и MDCCurrentTraceContext для печати traceId и spanId в журналах.

Обе записи журнала в WorldCommand не распечатывают идентификаторы трассировки и диапазона, они вызываются в потоке RxIoScheduler.

ИЗМЕНИТЬ

Добавлен ConcurrencyStrategy, предложенный Майком.

public class CustomHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

    private static final Logger log = LoggerFactory.getLogger(CustomHystrixConcurrencyStrategy.class);

    public <T> Callable<T> wrapCallable(Callable<T> callable){
        log.info("In CustomHystrixConcurrencyStrategy: callable="+ callable.toString());
        return new ContextCallable<>(callable);
    }
}

HelloService называет два сервиса World и Team. WorldCommand - это HystrixObservableCommand, TeamCommand - это HystrixCommand.

logger.info("In the HelloService:greetWithHelloWorld");
Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build();

//Call WorldService
ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client");
//Async stub instead of blockingStub
WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel);

WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);
String messageFromWorldService = "";
String idFromWorldService = "";
try {

    Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get();
    messageFromWorldService = greeterReply.getMessage();
    idFromWorldService = greeterReply.getId();
    logger.info("Response from WorldService  -- {}, id = {}", messageFromWorldService, idFromWorldService);
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) {
    logger.warn("Exception when calling WorldService\n" +  e);
}

//Call TeamService
ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client");
TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel);
TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub);

String messageFromTeamService = "";
String idFromTeamService = "";
try {
    Greeter.GreeterReply greeterReply = teamCommand.construct().toBlocking().toFuture().get();
    messageFromTeamService = greeterReply.getMessage();
    idFromTeamService = greeterReply.getId();
    logger.info("Response from TeamService  -- {}, id = {}", messageFromTeamService, idFromTeamService);
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) {
    logger.warn("Exception when calling TeamService\n" +  e);
}

assert(idFromWorldService.equals(idFromTeamService));
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build();
responseObserver.onNext(greeterReply);
responseObserver.onCompleted();

PreservableContext класс

public class PreservableContexts {

    //private final TraceContext traceContext;
    private static final Logger logger = LoggerFactory.getLogger(PreservableContexts.class.getName());

    public PreservableContexts() {
        logger.info("Creating new PreservableContexts");
        //this.traceContext = TraceContextHolder.getContext();
    }

    public void set() {
       // if (traceContext != null) {
            //TraceContextHolder.setContext(traceContext);
       // }
    }

    public void clear() {
        //TraceContextHolder.clearContext();
    }

Журнал в PreservableContexts и CustomHystrixConcurrencyStrategy никогда не печатается. Я регистрирую стратегию при запуске HelloServer.

HystrixConcurrencyStrategy strategy = new CustomHystrixConcurrencyStrategy();
        HystrixPlugins.getInstance().registerConcurrencyStrategy(strategy);
        context = HystrixRequestContext.initializeContext();

ИЗМЕНИТЬ 2

Обновлено, как настроены Observables:

    ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client");
    //Async stub instead of blockingStub
    WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel);
    WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);

    //Call TeamService
    ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client");
    TeamServiceGrpc.TeamServiceStub teamServiceStub = TeamServiceGrpc.newStub(teamChannel);
    //TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel);
    TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub);

    try {
        rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
        rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
        Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() {
            @Override
            public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) {
                String messageFromWorldService = worldReply.getMessage();
                String idFromWorldService = worldReply.getId();
                logger.info("Response from WorldService  -- {}, id = {}", messageFromWorldService, idFromWorldService);

                String messageFromTeamService = teamReply.getMessage();
                String idFromTeamService = teamReply.getId();
                logger.info("Response from TeamService  -- {}, id = {}", messageFromTeamService, idFromTeamService);

                assert(idFromWorldService.equals(idFromTeamService));
                Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build();
                logger.info("Final response=" + greeterReply.getMessage());
                responseObserver.onNext(greeterReply);
                responseObserver.onCompleted();
                return null;
            }
        });
    } catch (StatusRuntimeException e) {
        logger.warn("Exception when calling WorldService and/or TeamService\n" +  e);
    }

У меня странная проблема: вызовы TeamCommand и WorldCommand не завершаются, так как в этом коде никогда не выполняется:

Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() {
                @Override
                public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) {
                    String messageFromWorldService = worldReply.getMessage();

Кроме того, если есть запасной вариант, потоки Hystrix-timer больше не имеют MDC.


person user2237511    schedule 14.11.2017    source источник


Ответы (2)


Я не очень разбираюсь в hysterix, но если вы пытаетесь передать некоторую контекстную информацию, например идентификаторы трассировки, то io.grpc.Context - правильный класс для использования. Вам нужно будет вызвать context.withValue, чтобы создать новый контекст с traceID. В тех местах, где вы хотите данные, вам нужно прикрепить контекст. Также не забудьте отключить контекст, когда закончите, чего я не вижу в вашем фрагменте.

person Spencer    schedule 15.11.2017

Вам нужно использовать ...

HystrixPlugins.getInstance().registerConcurrencyStrategy(...)

... чтобы зарегистрировать пользовательский HystrixConcurrencyStrategy, который использует ваш собственный Callable ...

public class ConcurrencyStrategy extends HystrixConcurrencyStrategy {    
    @Override
    public <K> Callable<K> wrapCallable(Callable<K> c) {
        return new ContextCallable<>(c);
    }
}

... который применяет сохранение контекста вокруг схемы ...

public class ContextCallable<K> implements Callable<K> {

    private final Callable<K> callable;
    private final PreservableContexts contexts;

    public ContextCallable(Callable<K> actual) {
        this.callable = actual;
        this.contexts = new PreservableContexts();
    }

    @Override
    public K call() throws Exception {
        contexts.set();
        try {
            return callable.call();
        } finally {
            contexts.clear();
        }
    }
}

... via - это вспомогательный класс, способный сохранять контекст Zipkin ...

public class PreservableContexts {

    private final TraceContext traceContext;

    public PreservableContexts() {
        this.traceContext = TraceContextHolder.getContext();
    }

    public void set() {
        if (traceContext != null) {
            TraceContextHolder.setContext(traceContext);
        }
    }

    public void clear() {
        TraceContextHolder.clearContext();
    }

}

... и позволяя простой метод добавления других контекстов, которые вы, возможно, захотите сохранить, например MDC, SecurityContext и т. Д.

person Mike    schedule 17.11.2017
comment
Я создал собственный класс ConcurrentStrategy и добавил журналы, чтобы убедиться, что он вызывается, но журналы не печатаются. Я отредактировал вопрос, так как его сложно красиво отформатировать в комментарии. Любые идеи? Кроме того, в Javadoc HystrixConcurrenyStrategy упоминается его использование с HystrixCommand, а не с HystrixObservableCommand - `` Например, каждый Callable, выполняемый HystrixCommand, будет вызывать wrapCallable (Callable), чтобы дать возможность настраиваемым реализациям украсить Callable дополнительным поведением. `` Работает ли настраиваемая ConcurrentStrategy для обоих? - person user2237511; 17.11.2017
comment
Предполагая, что вы используете стратегию изоляции потоков тогда следует применить стратегию параллелизма ... Я не уверен в изоляции семафоров но все равно не похоже, что вы выбираете это. Я только что протестировал это локально, и команды Observable запускают плагин. Я думаю, что проблема в том, как вы вызываете команды. Вместо teamCommand.construct() попробуйте teamCommand.execute() или teamCommand.queue(). - person Mike; 17.11.2017
comment
Инструкции для синхронного выполнения и Асинхронное выполнение можно найти на Как использовать. - person Mike; 17.11.2017
comment
Спасибо, Майк, после изменения вызывается настраиваемая стратегия параллелизма. В конце концов, мне действительно понадобится изоляция семафоров. Я изменил код и обновил вопрос. Итак, теперь вызовы никогда не возвращаются, а в резервных журналах нет MDC. Звонки выполняются, и мне бы хотелось иметь MDC. - person user2237511; 21.11.2017
comment
Я думаю, что сейчас мы уходим от темы исходного сообщения. Возможно, пришло время отметить исходный вопрос как ответ и поднять отдельный вопрос. Однако беглый взгляд подсказывает мне, что вам нужно подписаться на объект, возвращаемый Observable.zip(...). - person Mike; 21.11.2017
comment
Я согласен с частью Observable.zip, но стратегия HystrixConcurrency по-прежнему не применяется к потокам HystrixTimer, что и вызывает резервные копии. - person user2237511; 21.11.2017