Дизайн RX — дросселирование обращений к внешним системам

Я пишу брокер сообщений, который подключается к IBM MQ и папкам в файловой системе. После сбора сообщений он затем материализует их в строго типизированные классы и подключает их к RX Subjects.

Я повысил осведомленность о сообщениях, которые позволяют мне определить, какие внешние системы должны быть затронуты для их обработки, поэтому я могу выполнять запросы к наблюдаемым RX и выбирать сообщения, которые не нацелены на внешнюю систему, и т. д.

Что я хочу сделать дальше, это дросселировать сообщения от внешней системы, например:

Если бы я попадал в CRM-систему с сообщением определенного типа, и я бы решил, что хочу поразить эту систему максимум 4 одновременными вызовами, я бы обрабатывал только 4 сообщения за раз, если бы у меня было 5-е сообщение, я бы нужно дождаться завершения одного из предыдущих 4, а затем перейти к 5-му. То же самое для других типов ресурсов, таких как внешние базы данных, другие внешние веб-сервисы и т. д.

Я начал исследовать этот вопрос, и до сих пор лучшим подходом к проектированию было бы написать собственный планировщик. Недостатком является то, что мне пришлось бы писать свои собственные внутренние структуры, которые помещают сообщения в очередь внутри планировщика после их получения, и именно здесь мне не нравится такой подход.

У кого-нибудь есть лучший способ сделать это?


person David Rodrigues    schedule 16.08.2013    source источник


Ответы (3)


arrow_upward
1
arrow_downward

То, что вы описываете, кажется максимальным параллелизмом. Оператор Merge поддерживает подобные вещи.

Вам нужно будет использовать что-то вроде GroupBy, чтобы разделить ваш поток в зависимости от того, куда он идет, затем использовать Merge с максимальным параллелизмом для каждой части разделения, а затем, наконец, Merge результаты снова вместе. Что-то вроде этого:

IObservable<T> requests = ...;
requests.GroupBy(request => PickExternalSystem(request))
    .Select(group => group // group.Key is the TExternalSystem
        .Select(request => Observable.Defer(() => group.Key.ExecuteAsync(request)))
        .Merge(maxConcurrency: group.Key.MaxConcurrency))
    .Merge() // merge the results of each group back together again
    .Subscribe(result => ...);
person Brandon    schedule 17.08.2013

arrow_upward
0
arrow_downward

Вы можете изучить ReactiveUI, который включает механизм регулирования скорости запросов на обслуживание. См. http://blog.paulbetts.org/index.php/2011/01/15/reactivexaml-is-now-reactiveui-2-0/

person Jim Wooley    schedule 16.08.2013

arrow_upward
0
arrow_downward

Я также опубликовал это тот же вопрос в MSDN и получил более подробный ответ с другой реализацией оператора слияния, чтобы не происходило потери данных при изменении максимальных значений параллелизма.

person David Rodrigues    schedule 20.08.2013