Skip to content

Commit 0bf3b7b

Browse files
authored
fix: Restrict Orchestrator process only messages from its own group (#155)
* fix: Restrict Orchestrator process only messages from its own group * docs: version up 0.4.9 -> 0.5.0
1 parent 6a2baea commit 0bf3b7b

14 files changed

+89
-40
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
55
<br>
66

7-
![version 0.4.9](https://img.shields.io/badge/version-0.4.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
7+
![version 0.5.0](https://img.shields.io/badge/version-0.4.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
88
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)
99

1010
**TPS(6,000)** on my Macbook air m2(default options). _[link](#Test1-TPS)_

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ kotlin.code.style=official
22

33
### Project ###
44
group=org.rooftopmsa
5-
version=0.4.9
5+
version=0.5.0
66
compatibility=17
77

88
### Sonarcloud ###

src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt

Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import org.rooftop.netx.engine.listen.*
66
import reactor.core.publisher.Mono
77

88
internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> private constructor(
9+
private val group: String,
910
private val orchestratorId: String,
1011
private val orchestrateSequence: Int,
1112
private val chainContainer: ChainContainer,
@@ -56,19 +57,21 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
5657
requestHolder = chainContainer.requestHolder,
5758
resultHolder = chainContainer.resultHolder,
5859
typeReference = function.reified(),
60+
group = group,
5961
)
6062

6163
private fun <S : Any> nextOrchestrateChain(
6264
nextJoinOrchestrateListener: JoinOrchestrateListener<V, S>,
6365
nextRollbackOrchestrateListener: RollbackOrchestrateListener<V, S>?
6466
): OrchestrateChain<OriginReq, V, S> {
6567
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
66-
orchestratorId,
67-
orchestrateSequence + 1,
68-
chainContainer,
69-
nextJoinOrchestrateListener,
70-
nextRollbackOrchestrateListener,
71-
this,
68+
group = group,
69+
orchestratorId = orchestratorId,
70+
orchestrateSequence = orchestrateSequence + 1,
71+
chainContainer = chainContainer,
72+
orchestrateListener = nextJoinOrchestrateListener,
73+
rollbackOrchestrateListener = nextRollbackOrchestrateListener,
74+
beforeDefaultOrchestrateChain = this,
7275
)
7376
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
7477

@@ -97,12 +100,13 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
97100
getMonoRollbackOrchestrateListener<V, S>(CommandType.CONTEXT, contextRollback)
98101

99102
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
100-
orchestratorId,
101-
orchestrateSequence + 1,
102-
chainContainer,
103-
nextJoinOrchestrateListener,
104-
nextRollbackOrchestrateListener,
105-
this,
103+
group = group,
104+
orchestratorId = orchestratorId,
105+
orchestrateSequence = orchestrateSequence + 1,
106+
chainContainer = chainContainer,
107+
orchestrateListener = nextJoinOrchestrateListener,
108+
rollbackOrchestrateListener = nextRollbackOrchestrateListener,
109+
beforeDefaultOrchestrateChain = this,
106110
)
107111
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
108112

@@ -124,20 +128,22 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
124128
),
125129
requestHolder = chainContainer.requestHolder,
126130
resultHolder = chainContainer.resultHolder,
127-
function.reified(),
131+
typeReference = function.reified(),
132+
group = group,
128133
)
129134

130135
private fun <S : Any> nextOrchestrateChain(
131136
nextJoinOrchestrateListener: MonoJoinOrchestrateListener<V, S>,
132137
nextRollbackOrchestrateListener: MonoRollbackOrchestrateListener<V, S>?
133138
): OrchestrateChain<OriginReq, V, S> {
134139
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
135-
orchestratorId,
136-
orchestrateSequence + 1,
137-
chainContainer,
138-
nextJoinOrchestrateListener,
139-
nextRollbackOrchestrateListener,
140-
this,
140+
group = group,
141+
orchestratorId = orchestratorId,
142+
orchestrateSequence = orchestrateSequence + 1,
143+
chainContainer = chainContainer,
144+
orchestrateListener = nextJoinOrchestrateListener,
145+
rollbackOrchestrateListener = nextRollbackOrchestrateListener,
146+
beforeDefaultOrchestrateChain = this,
141147
)
142148
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
143149

@@ -173,7 +179,8 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
173179
orchestrateCommand = OrchestrateCommand<T, V>(commandType, chainContainer.codec, function),
174180
resultHolder = chainContainer.resultHolder,
175181
requestHolder = chainContainer.requestHolder,
176-
function.reified(),
182+
typeReference = function.reified(),
183+
group = group,
177184
)
178185

179186
private fun <T : Any, V : Any> getRollbackOrchestrateListener(
@@ -189,6 +196,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
189196
requestHolder = chainContainer.requestHolder,
190197
resultHolder = chainContainer.resultHolder,
191198
typeReference = it.reified(),
199+
group = group,
192200
)
193201
}
194202

@@ -197,12 +205,13 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
197205
): Orchestrator<OriginReq, S> {
198206
return chainContainer.orchestratorCache.cache(orchestratorId) {
199207
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
200-
orchestratorId,
201-
orchestrateSequence + 1,
202-
chainContainer,
203-
nextCommitOrchestrateListener,
204-
null,
205-
this,
208+
orchestratorId = orchestratorId,
209+
orchestrateSequence = orchestrateSequence + 1,
210+
chainContainer = chainContainer,
211+
orchestrateListener = nextCommitOrchestrateListener,
212+
rollbackOrchestrateListener = null,
213+
beforeDefaultOrchestrateChain = this,
214+
group = group,
206215
)
207216
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
208217
val firstOrchestrators = nextDefaultOrchestrateChain.initOrchestrateListeners()
@@ -241,12 +250,13 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
241250
): Orchestrator<OriginReq, S> {
242251
return chainContainer.orchestratorCache.cache(orchestratorId) {
243252
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
244-
orchestratorId,
245-
orchestrateSequence + 1,
246-
chainContainer,
247-
nextJoinOrchestrateListener,
248-
null,
249-
this,
253+
group = group,
254+
orchestratorId = orchestratorId,
255+
orchestrateSequence = orchestrateSequence + 1,
256+
chainContainer = chainContainer,
257+
orchestrateListener = nextJoinOrchestrateListener,
258+
rollbackOrchestrateListener = null,
259+
beforeDefaultOrchestrateChain = this,
250260
)
251261
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
252262

@@ -385,6 +395,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
385395
resultHolder = chainContainer.resultHolder,
386396
requestHolder = chainContainer.requestHolder,
387397
typeReference = function.reified(),
398+
group = group,
388399
)
389400

390401
private fun <T : Any, V : Any> getMonoRollbackOrchestrateListener(
@@ -404,10 +415,12 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
404415
requestHolder = chainContainer.requestHolder,
405416
resultHolder = chainContainer.resultHolder,
406417
typeReference = it.reified(),
418+
group = group,
407419
)
408420
}
409421

410422
internal class Pre<T : Any> internal constructor(
423+
private val group: String,
411424
private val orchestratorId: String,
412425
private val sagaManager: SagaManager,
413426
private val sagaDispatcher: AbstractSagaDispatcher,
@@ -427,6 +440,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
427440
getRollbackOrchestrateListener<V>(CommandType.DEFAULT, rollback)
428441

429442
return DefaultOrchestrateChain(
443+
group = group,
430444
orchestratorId = orchestratorId,
431445
orchestrateSequence = 0,
432446
chainContainer = getStreamContainer(),
@@ -445,6 +459,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
445459
getRollbackOrchestrateListener<V>(CommandType.CONTEXT, contextRollback)
446460

447461
return DefaultOrchestrateChain(
462+
group = group,
448463
orchestratorId = orchestratorId,
449464
orchestrateSequence = 0,
450465
chainContainer = getStreamContainer(),
@@ -469,6 +484,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
469484
requestHolder = requestHolder,
470485
resultHolder = resultHolder,
471486
typeReference = function.reified(),
487+
group = group,
472488
)
473489

474490
private fun <V : Any> getRollbackOrchestrateListener(
@@ -487,7 +503,8 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
487503
),
488504
requestHolder = requestHolder,
489505
resultHolder = resultHolder,
490-
typeReference = it.reified()
506+
typeReference = it.reified(),
507+
group = group,
491508
)
492509
}
493510

@@ -501,6 +518,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
501518
getMonoRollbackOrchestrateListener<V>(CommandType.DEFAULT, rollback)
502519

503520
return DefaultOrchestrateChain(
521+
group = group,
504522
orchestratorId = orchestratorId,
505523
orchestrateSequence = 0,
506524
chainContainer = getStreamContainer(),
@@ -519,6 +537,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
519537
getMonoRollbackOrchestrateListener<V>(CommandType.CONTEXT, contextRollback)
520538

521539
return DefaultOrchestrateChain(
540+
group = group,
522541
orchestratorId = orchestratorId,
523542
orchestrateSequence = 0,
524543
chainContainer = getStreamContainer(),
@@ -543,6 +562,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
543562
requestHolder = requestHolder,
544563
resultHolder = resultHolder,
545564
typeReference = function.reified(),
565+
group = group,
546566
)
547567

548568
private fun <V : Any> getMonoRollbackOrchestrateListener(
@@ -562,6 +582,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
562582
requestHolder = requestHolder,
563583
resultHolder = resultHolder,
564584
typeReference = it.reified(),
585+
group = group,
565586
)
566587
}
567588

src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import org.rooftop.netx.api.OrchestratorFactory
55
import org.rooftop.netx.core.Codec
66

77
internal class OrchestratorFactory internal constructor(
8+
private val group: String,
89
private val sagaManager: SagaManager,
910
private val sagaDispatcher: AbstractSagaDispatcher,
1011
private val codec: Codec,
@@ -26,6 +27,7 @@ internal class OrchestratorFactory internal constructor(
2627
resultHolder = resultHolder,
2728
requestHolder = requestHolder,
2829
orchestratorCache = orchestratorCache,
30+
group = group,
2931
)
3032
}
3133
}

src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ internal abstract class AbstractOrchestrateListener<T : Any, V : Any> internal c
2020
private val requestHolder: RequestHolder,
2121
private val resultHolder: ResultHolder,
2222
private val typeReference: TypeReference<T>?,
23+
private val group: String,
2324
) {
2425

2526
var isFirst: Boolean = true
@@ -58,7 +59,7 @@ internal abstract class AbstractOrchestrateListener<T : Any, V : Any> internal c
5859
protected fun orchestrate(sagaEvent: SagaEvent): Mono<OrchestrateEvent> {
5960
return sagaEvent.startWithOrchestrateEvent()
6061
.filter {
61-
it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId
62+
it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaEvent.group == group
6263
}
6364
.mapReifiedRequest()
6465
.flatMap { (request, event) ->

src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ internal class CommitOrchestrateListener<T : Any, V : Any> internal constructor(
1616
private val resultHolder: ResultHolder,
1717
requestHolder: RequestHolder,
1818
typeReference: TypeReference<T>?,
19+
private val group: String,
1920
) : AbstractOrchestrateListener<T, V>(
2021
orchestratorId,
2122
orchestrateSequence,
@@ -24,12 +25,13 @@ internal class CommitOrchestrateListener<T : Any, V : Any> internal constructor(
2425
requestHolder,
2526
resultHolder,
2627
typeReference,
28+
group
2729
) {
2830

2931
@SagaCommitListener(OrchestrateEvent::class)
3032
fun listenCommitOrchestrateEvent(sagaCommitEvent: SagaCommitEvent): Mono<V> {
3133
return sagaCommitEvent.startWithOrchestrateEvent()
32-
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId }
34+
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaCommitEvent.group == this.group }
3335
.mapReifiedRequest()
3436
.flatMap { (request, event) ->
3537
holdRequestIfRollbackable(request, sagaCommitEvent.id)

src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ internal class JoinOrchestrateListener<T : Any, V : Any>(
1616
private val requestHolder: RequestHolder,
1717
private val resultHolder: ResultHolder,
1818
private val typeReference: TypeReference<T>?,
19+
private val group: String,
1920
) : AbstractOrchestrateListener<T, V>(
2021
orchestratorId,
2122
orchestrateSequence,
@@ -24,6 +25,7 @@ internal class JoinOrchestrateListener<T : Any, V : Any>(
2425
requestHolder,
2526
resultHolder,
2627
typeReference,
28+
group,
2729
) {
2830

2931
override fun withAnnotated(): AbstractOrchestrateListener<T, V> {
@@ -43,6 +45,7 @@ internal class JoinOrchestrateListener<T : Any, V : Any>(
4345
requestHolder,
4446
resultHolder,
4547
typeReference,
48+
group,
4649
) {
4750
@SagaJoinListener(
4851
event = OrchestrateEvent::class,
@@ -67,6 +70,7 @@ internal class JoinOrchestrateListener<T : Any, V : Any>(
6770
requestHolder,
6871
resultHolder,
6972
typeReference,
73+
group,
7074
) {
7175
@SagaJoinListener(
7276
event = OrchestrateEvent::class,

src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ internal class MonoCommitOrchestrateListener<T : Any, V : Any> internal construc
1616
requestHolder: RequestHolder,
1717
private val resultHolder: ResultHolder,
1818
typeReference: TypeReference<T>?,
19+
private val group: String,
1920
) : AbstractOrchestrateListener<T, V>(
2021
orchestratorId,
2122
orchestrateSequence,
@@ -24,11 +25,12 @@ internal class MonoCommitOrchestrateListener<T : Any, V : Any> internal construc
2425
requestHolder,
2526
resultHolder,
2627
typeReference,
28+
group,
2729
) {
2830
@SagaCommitListener(OrchestrateEvent::class)
2931
fun listenCommitOrchestrateEvent(sagaCommitEvent: SagaCommitEvent): Mono<V> {
3032
return sagaCommitEvent.startWithOrchestrateEvent()
31-
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId }
33+
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaCommitEvent.group == this.group}
3234
.mapReifiedRequest()
3335
.flatMap { (request, event) ->
3436
holdRequestIfRollbackable(request, sagaCommitEvent.id)

src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ internal class MonoJoinOrchestrateListener<T : Any, V : Any>(
1616
private val requestHolder: RequestHolder,
1717
private val resultHolder: ResultHolder,
1818
private val typeReference: TypeReference<T>?,
19+
private val group: String,
1920
) : AbstractOrchestrateListener<T, V>(
2021
orchestratorId,
2122
orchestrateSequence,
@@ -24,6 +25,7 @@ internal class MonoJoinOrchestrateListener<T : Any, V : Any>(
2425
requestHolder,
2526
resultHolder,
2627
typeReference,
28+
group,
2729
) {
2830

2931
override fun withAnnotated(): AbstractOrchestrateListener<T, V> {
@@ -43,6 +45,7 @@ internal class MonoJoinOrchestrateListener<T : Any, V : Any>(
4345
requestHolder,
4446
resultHolder,
4547
typeReference,
48+
group,
4649
) {
4750
@SagaJoinListener(
4851
event = OrchestrateEvent::class,
@@ -67,6 +70,7 @@ internal class MonoJoinOrchestrateListener<T : Any, V : Any>(
6770
requestHolder,
6871
resultHolder,
6972
typeReference,
73+
group,
7074
) {
7175
@SagaJoinListener(
7276
event = OrchestrateEvent::class,

0 commit comments

Comments
 (0)