Skip to content

Commit fe5c17c

Browse files
zk-drizzledrizzle.zk
andauthored
Optimized the function naming (#9935)
* add isWakeCommitWhenPutMessage for AIO * optimzie the Function name Change-Id: Id91e3eb9c4488fb9804fb2c105082657e66c44c0 * optimized the function naming Change-Id: Ifc482f91220ff328e5c5425a57a04ac627e8d469 --------- Co-authored-by: drizzle.zk <drizzle.zk@alibaba-inc.com>
1 parent 02412eb commit fe5c17c

File tree

8 files changed

+22
-22
lines changed

8 files changed

+22
-22
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -878,7 +878,7 @@ public boolean initializeMessageStore() {
878878
}
879879
if (messageStoreConfig.isTransRocksDBEnable()) {
880880
this.transMessageRocksDBStore = new TransMessageRocksDBStore(messageStore, brokerStatsManager, new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
881-
this.messageStore.setTransRocksDBStore(transMessageRocksDBStore);
881+
this.messageStore.setTransMessageRocksDBStore(transMessageRocksDBStore);
882882
}
883883
} catch (Exception e) {
884884
result = false;

broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ private void deletePrepareMessage(OperationResult result) {
203203
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(halfTopic)) {
204204
this.brokerController.getTransactionalMessageService().deletePrepareMessage(prepareMessage);
205205
} else if (this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && TopicValidator.RMQ_SYS_ROCKSDB_TRANS_HALF_TOPIC.equals(halfTopic)) {
206-
this.brokerController.getMessageStore().getTransRocksDBStore().deletePrepareMessage(prepareMessage);
206+
this.brokerController.getMessageStore().getTransMessageRocksDBStore().deletePrepareMessage(prepareMessage);
207207
} else {
208208
LOGGER.warn("deletePrepareMessage error, topic of half message is: {}, transRocksDBEnable: {}", halfTopic, this.brokerController.getMessageStoreConfig().isTransRocksDBEnable());
209209
}
@@ -287,8 +287,8 @@ private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
287287
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
288288
msgInner.setTagsCode(tagsCodeValue);
289289
String checkTimes = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES);
290-
if (StringUtils.isEmpty(checkTimes) && this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && null != this.brokerController.getMessageStore().getTransRocksDBStore()) {
291-
Integer checkTimesRocksDB = this.brokerController.getMessageStore().getTransRocksDBStore().getCheckTimes(msgInner.getTopic(), msgInner.getTransactionId(), msgExt.getCommitLogOffset());
290+
if (StringUtils.isEmpty(checkTimes) && this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && null != this.brokerController.getMessageStore().getTransMessageRocksDBStore()) {
291+
Integer checkTimesRocksDB = this.brokerController.getMessageStore().getTransMessageRocksDBStore().getCheckTimes(msgInner.getTopic(), msgInner.getTransactionId(), msgExt.getCommitLogOffset());
292292
if (null != checkTimesRocksDB && checkTimesRocksDB >= 0) {
293293
msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTimesRocksDB));
294294
}

broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class TransactionalMessageRocksDBService {
5858

5959
public TransactionalMessageRocksDBService(final MessageStore messageStore, final BrokerController brokerController) {
6060
this.messageStore = messageStore;
61-
this.transMessageRocksDBStore = messageStore.getTransRocksDBStore();
61+
this.transMessageRocksDBStore = messageStore.getTransMessageRocksDBStore();
6262
this.messageRocksDBStorage = transMessageRocksDBStore.getMessageRocksDBStorage();
6363
this.brokerController = brokerController;
6464
}

store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,9 @@ public class CommitLog implements Swappable {
111111

112112
public CommitLog(final DefaultMessageStore messageStore) {
113113
String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog();
114-
RunningFlags runningFlags = messageStore.getMessageStoreConfig().isEnableRunningFlagsInFlush()
114+
RunningFlags runningFlags = messageStore.getMessageStoreConfig().isEnableRunningFlagsInFlush()
115115
? messageStore.getRunningFlags() : null;
116-
116+
117117
if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) {
118118
this.mappedFileQueue = new MultiPathMappedFileQueue(messageStore.getMessageStoreConfig(),
119119
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
@@ -927,8 +927,8 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile,
927927

928928
private boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp, boolean recoverNormally) throws RocksDBException {
929929
boolean result = this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
930-
if (null != this.defaultMessageStore.getTransRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() && !defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) {
931-
result = result && this.defaultMessageStore.getTransRocksDBStore().isMappedFileMatchedRecover(phyOffset);
930+
if (null != this.defaultMessageStore.getTransMessageRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() && !defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) {
931+
result = result && this.defaultMessageStore.getTransMessageRocksDBStore().isMappedFileMatchedRecover(phyOffset);
932932
}
933933
if (null != this.defaultMessageStore.getIndexRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isIndexRocksDBEnable()) {
934934
result = result && this.defaultMessageStore.getIndexRocksDBStore().isMappedFileMatchedRecover(phyOffset);

store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,12 +1080,12 @@ public TimerMessageStore getTimerMessageStore() {
10801080
}
10811081

10821082
@Override
1083-
public TimerMessageRocksDBStore getTimerRocksDBStore() {
1083+
public TimerMessageRocksDBStore getTimerMessageRocksDBStore() {
10841084
return this.timerMessageRocksDBStore;
10851085
}
10861086

10871087
@Override
1088-
public TransMessageRocksDBStore getTransRocksDBStore() {
1088+
public TransMessageRocksDBStore getTransMessageRocksDBStore() {
10891089
return this.transMessageRocksDBStore;
10901090
}
10911091

@@ -1100,7 +1100,7 @@ public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRoc
11001100
}
11011101

11021102
@Override
1103-
public void setTransRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) {
1103+
public void setTransMessageRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) {
11041104
this.transMessageRocksDBStore = transMessageRocksDBStore;
11051105
}
11061106

store/src/main/java/org/apache/rocketmq/store/MessageStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,15 @@ CompletableFuture<GetMessageResult> getMessageAsync(final String group, final St
210210

211211
TimerMessageStore getTimerMessageStore();
212212

213-
TimerMessageRocksDBStore getTimerRocksDBStore();
213+
TimerMessageRocksDBStore getTimerMessageRocksDBStore();
214214

215-
TransMessageRocksDBStore getTransRocksDBStore();
215+
TransMessageRocksDBStore getTransMessageRocksDBStore();
216216

217217
void setTimerMessageStore(TimerMessageStore timerMessageStore);
218218

219219
void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRocksDBStore);
220220

221-
void setTransRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore);
221+
void setTransMessageRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore);
222222

223223
/**
224224
* Get the offset of the message in the commit log, which is also known as physical offset.

store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public void recover() {
318318
}
319319
currQueueOffset = Math.min(currQueueOffset, timerCheckpoint.getMasterTimerQueueOffset());
320320
if (storeConfig.isTimerRocksDBEnable()) {
321-
long commitOffsetInRocksDB = messageStore.getTimerRocksDBStore().getCommitOffsetInRocksDB();
321+
long commitOffsetInRocksDB = messageStore.getTimerMessageRocksDBStore().getCommitOffsetInRocksDB();
322322
LOGGER.info("recover time wheel, currQueueOffset: {}, commitOffsetInRocksDB: {}", currQueueOffset, commitOffsetInRocksDB);
323323
currQueueOffset = Math.max(currQueueOffset, commitOffsetInRocksDB);
324324
}
@@ -2087,12 +2087,12 @@ private void recallToTimeline(long delayTime, long offsetPy, int sizePy, Message
20872087
LOGGER.error("recallToTimeline param error, delayTime: {}, offsetPy: {}, sizePy: {}, messageExt: {}", delayTime, offsetPy, sizePy, messageExt);
20882088
return;
20892089
}
2090-
if (null == messageStore.getTimerRocksDBStore() || null == messageStore.getTimerRocksDBStore().getTimeline()) {
2090+
if (null == messageStore.getTimerMessageRocksDBStore() || null == messageStore.getTimerMessageRocksDBStore().getTimeline()) {
20912091
LOGGER.error("recallToTimeline error, timerRocksDBStore is null or timeline is null");
20922092
return;
20932093
}
20942094
try {
2095-
messageStore.getTimerRocksDBStore().getTimeline().putDeleteRecord(delayTime, messageExt.getMsgId(), offsetPy, sizePy, messageExt.getQueueOffset(), messageExt);
2095+
messageStore.getTimerMessageRocksDBStore().getTimeline().putDeleteRecord(delayTime, messageExt.getMsgId(), offsetPy, sizePy, messageExt.getQueueOffset(), messageExt);
20962096
} catch (Exception e) {
20972097
LOGGER.error("recallToTimeline error: {}", e.getMessage());
20982098
}
@@ -2109,7 +2109,7 @@ public boolean restart() {
21092109
LOGGER.info("restart TimerMessageStore has been running");
21102110
return true;
21112111
}
2112-
long commitOffsetRocksDB = this.messageStore.getTimerRocksDBStore().getCommitOffsetInRocksDB();
2112+
long commitOffsetRocksDB = this.messageStore.getTimerMessageRocksDBStore().getCommitOffsetInRocksDB();
21132113
long commitOffsetFile = this.messageStore.getTimerMessageStore().getCommitQueueOffset();
21142114
long maxCommitOffset = Math.max(commitOffsetFile, commitOffsetRocksDB);
21152115
currQueueOffset = maxCommitOffset;

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,12 +314,12 @@ public long getMinOffsetInQueue(String topic, int queueId) {
314314
}
315315

316316
@Override
317-
public TimerMessageRocksDBStore getTimerRocksDBStore() {
317+
public TimerMessageRocksDBStore getTimerMessageRocksDBStore() {
318318
return timerMessageRocksDBStore;
319319
}
320320

321321
@Override
322-
public TransMessageRocksDBStore getTransRocksDBStore() {
322+
public TransMessageRocksDBStore getTransMessageRocksDBStore() {
323323
return transMessageRocksDBStore;
324324
}
325325

@@ -329,7 +329,7 @@ public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRoc
329329
}
330330

331331
@Override
332-
public void setTransRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) {
332+
public void setTransMessageRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) {
333333
this.transMessageRocksDBStore = transMessageRocksDBStore;
334334
}
335335

0 commit comments

Comments
 (0)