Proto commits in apache/celeborn

These commits are when the Protocol Buffers files have changed: (only the last 100 relevant commits are shown)

Commit:95f0acf
Author:zhaohehuhu
Committer:mingji

[CELEBORN-1961] Convert Resource.proto from Protocol Buffers version 2 to version 3 ### What changes were proposed in this pull request? as title ### Why are the changes needed? Upgrade PB version as fist step as per below design https://cwiki.apache.org/confluence/display/CELEBORN/CIP-16+Merge+transport+proto+and+resource+proto+files ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #3201 from zhaohehuhu/dev-0403. Authored-by: zhaohehuhu <luoyedeyi@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>

The documentation is generated from this commit.

Commit:5e12b7d
Author:Wang, Fei

[CELEBORN-1921] Broadcast large GetReducerFileGroupResponse to prevent Spark driver network exhausted ### What changes were proposed in this pull request? For spark celeborn application, if the GetReducerFileGroupResponse is larger than the threshold, Spark driver would broadcast the GetReducerFileGroupResponse to the executors, it prevents the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor). ### Why are the changes needed? To prevent the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor). ### Does this PR introduce _any_ user-facing change? No, the feature is not enabled by defaults. ### How was this patch tested? UT. Cluster testing with `spark.celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.enabled=true`. The broadcast response size should be always about 1kb. ![image](https://github.com/user-attachments/assets/d5d1b751-762d-43c8-8a84-0674630a5638) ![image](https://github.com/user-attachments/assets/4841a29e-5d11-4932-9fa5-f6e78b7bc521) Application succeed. ![image](https://github.com/user-attachments/assets/9b570f70-1433-4457-90ae-b8292e5476ba) Closes #3158 from turboFei/broadcast_rgf. Authored-by: Wang, Fei <fwang12@ebay.com> Signed-off-by: Wang, Fei <fwang12@ebay.com>

Commit:d659e06
Author:wangshengjie
Committer:Shuang

[CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files ### What changes were proposed in this pull request? Add logic to support avoid sorting shuffle files for Reduce mode when optimize skew partitions ### Why are the changes needed? Current logic need sorting shuffle files when read Reduce mode skew partition shuffle files, we found some shuffle sorting timeout and performance issue ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Cluster test and uts Closes #2373 from wangshengjie123/optimize-skew-partition. Lead-authored-by: wangshengjie <wangshengjie3@xiaomi.com> Co-authored-by: wangshengjie3 <wangshengjie3@xiaomi.com> Co-authored-by: Fu Chen <cfmcgrady@gmail.com> Co-authored-by: Shuang <lvshuang.tb@gmail.com> Co-authored-by: wangshengjie3 <soldier.sj.wang@gmail.com> Co-authored-by: Fei Wang <fwang12@ebay.com> Co-authored-by: Wang, Fei <fwang12@ebay.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:ad93381
Author:Wang, Fei
Committer:Shuang

[CELEBORN-1720] Prevent stage re-run if another task attempt is running or successful ### What changes were proposed in this pull request? Prevent stage re-run if another task attempt is running. If a shuffle read task can not read the shuffle data and the task another attempt is running or successful, just throw the CelebornIOException instead of FetchFailureException. The app will not failure before reach the task maxFailures. <img width="1610" alt="image" src="https://github.com/user-attachments/assets/ffc6d80e-7c90-4729-adf7-6f8c46a8f226"> ### Why are the changes needed? I met below issue because I set the wrong parameters, I should set `spark.celeborn.data.io.connectTime=30s` but set the `spark.celeborn.data.io.connectionTime=30s`, and the Disk IO Utils was high at that time. 0. speculation is enabled 1. one task failed to fetch shuffle 0 in stage 5. 2. then it triggered the stage 0 re-run (stage 4) 3. then stage 5 retry, however, no task run in stage 5 (retry 1) <img width="1212" alt="image" src="https://github.com/user-attachments/assets/555f36b0-0f0d-452d-af0b-1573601165e2"> 4. because the speculation task succeeded, so no task in stage 5(retry 1) <img width="1715" alt="image" src="https://github.com/user-attachments/assets/7f315149-1d5c-4c32-ae9b-87b099b3297f"> Due the stage re-run is heavy, so I wonder that, we should ignore the shuffle fetch failure, if there is another task attempt running. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT for the SparkUtils method only, due it is impossible to add UT for speculation. https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L236-L244 <img width="867" alt="image" src="https://github.com/user-attachments/assets/f93bd14f-0f34-4c81-a8db-13be511405d9"> For local master, it would not start the speculationScheduler. https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L322-L346 <img width="1010" alt="image" src="https://github.com/user-attachments/assets/477729a4-2fc1-47e9-b128-522c6e2ceb48"> and it is also not allowed to launch speculative task on the same host. Closes #2921 from turboFei/task_id. Lead-authored-by: Wang, Fei <fwang12@ebay.com> Co-authored-by: Fei Wang <cn.feiwang@gmail.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:e41ee2d
Author:jiang13021
Committer:Shuang

[CELEBORN-1721][CIP-12] Support HARD_SPLIT in PushMergedData ### What changes were proposed in this pull request? As title. ### Why are the changes needed? https://docs.google.com/document/d/1Jaix22vME0m1Q-JtTHF9WYsrsxBWBwzwmifcPxNQZHk/edit?tab=t.0#heading=h.iadpu3t4rywi (Thanks to cfmcgrady littlexyw ErikFang waitinfuture RexXiong FMX for their efforts on the proposal) ### Does this PR introduce _any_ user-facing change? The response of pushMergedData has been modified, however, the changes are backward compatible. ### How was this patch tested? UT: org.apache.celeborn.service.deploy.cluster.PushMergedDataHardSplitSuite Closes #2924 from jiang13021/cip-12. Authored-by: jiang13021 <jiangyanze.jyz@antgroup.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:59163c2
Author:Wang, Fei
Committer:mingji

[CELEBORN-1745] Remove application top disk usage code ### What changes were proposed in this pull request? Remove the code for app top disk usage both in master and worker end. Prefer to use below prometheus expr to figure out the top app usages. ``` topk(50, sum by (applicationId) (metrics_diskBytesWritten_Value{role="worker", applicationId!=""})) ``` ### Why are the changes needed? To address comments: https://github.com/apache/celeborn/pull/2947#issuecomment-2499564978 > Due to the application dimension resource consumption, this feature should be included in the deprecated features. Maybe you can remove the codes for application top disk usage. ### Does this PR introduce _any_ user-facing change? Yes, remove the app top disk usage api. ### How was this patch tested? GA. Closes #2949 from turboFei/remove_app_top_usage. Authored-by: Wang, Fei <fwang12@ebay.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>

Commit:169b6f6
Author:SteNicholas
Committer:Shuang

[CELEBORN-1685] ShuffleFallbackPolicy supports ShuffleFallbackCount metric ### What changes were proposed in this pull request? 1. `ShuffleFallbackPolicy` supports `ShuffleFallbackCount` metric to provide the shuffle fallback count of each fallback policy. 2. Introduce `ShuffleTotalCount` metric to record the total count of shuffle. 3. Fix Spark 2 does not increment shuffle count via `LifecycleManager`. ### Why are the changes needed? The implementations of `ShuffleFallbackPolicy` does not support `ShuffleFallbackCount` metric at present. Meanwhile, Bilibili production practice needs `ShuffleFallbackCount` of different `ShuffleFallbackPolicy`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Cluster test. Closes #2891 from SteNicholas/CELEBORN-1685. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:f1bda46
Author:Wang, Fei
Committer:mingji

[CELEBORN-1680] Introduce ShuffleFallbackCount metrics ### What changes were proposed in this pull request? As title, introduce metrics_ShuffleFallbackCount_Value. ### Why are the changes needed? To provide the insights that how many shuffles fallback to spark built-in shuffle service. It is helpful for us to deprecate the ESS progressively. Currently, we plan to set the `celeborn.client.spark.shuffle.fallback.numPartitionsThreshold` to fallback the shuffle with too large shuffle partitions number, for example: 50k. In the future, we plan to limit the acceptable maximum shuffle partition number so that the bad job would be rejected and not impact the celeborn master health. ### Does this PR introduce _any_ user-facing change? Yes, new metrics. ### How was this patch tested? UT. <img width="1188" alt="image" src="https://github.com/user-attachments/assets/8193c12c-5dc9-4783-b64b-6a8449a1bea4"> Closes #2866 from turboFei/record_fallback. Lead-authored-by: Wang, Fei <fwang12@ebay.com> Co-authored-by: Fei Wang <cn.feiwang@gmail.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>

Commit:64f201d
Author:szt
Committer:Shuang

[CELEBORN-1636][FOLLOWUP] Dynamic resources will only be utilized in case of candidates shortages ### What changes were proposed in this pull request? Follow up of [https://github.com/apache/celeborn/pull/2835] Only use dynamic resources when candidates are not enough. And change the way geting availableWorkers form heartbeat to requestSlots RPC to avoid the burden of heartbeat. ### Why are the changes needed? No ### Does this PR introduce _any_ user-facing change? Add another configuration. ### How was this patch tested? UT Closes #2852 from zaynt4606/clb1636-flu2. Authored-by: szt <zaynt4606@163.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:6ad02f1
Author:Xianming Lei
Committer:Shuang

[CELEBORN-1577][PHASE1] Storage quota should support interrupt shuffle ### What changes were proposed in this pull request? Support interrupt shuffle on client side. I will develop the following functions in order 1. Client supports interrupt shuffle 2. Master supports calculating app-level shuffle usage ### Why are the changes needed? The current storage quota logic can only limit new shuffles, and cannot limit the writing of existing shuffles. In our production environment, there is such an scenario: the cluster is small, but the user's app single shuffle is large which occupied disk resources, we want to interrupt those shuffle. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unable to test this part independently, Additional tests will be added after completing the second part. Closes #2801 from leixm/CELEBORN-1577-1. Authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:f4dc7a8
Author:Weijie Guo
Committer:Shuang

[CELEBORN-1490][CIP-6] Impl worker read process in Flink Hybrid Shuffle ### What changes were proposed in this pull request? Impl worker read process in Flink Hybrid Shuffle ### Does this PR introduce _any_ user-facing change? No Closes #2820 from reswqa/cip6-8-pr. Lead-authored-by: Weijie Guo <reswqa@163.com> Co-authored-by: codenohup <huangxu.walker@gmail.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:7685fa7
Author:szt
Committer:Shuang

[CELEBORN-1636] Client supports dynamic update of Worker resources on the server ### What changes were proposed in this pull request? Currently, the ChangePartitionManager retrieves workers from the LifeCycleManager's workerSnapshot. However, during the revival process in reallocateChangePartitionRequestSlotsFromCandidates, it does not account for newly added available workers resulting from elastic contraction and expansion. This PR addresses this issue by updating the candidate workers in the ChangePartitionManager to use the available workers reported in the heartbeat from the master. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #2835 from zaynt4606/clbdev. Authored-by: szt <zaynt4606@163.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:df01fad
Author:mingji
Committer:SteNicholas

[CELEBORN-1601] Support revise lost shuffles ### What changes were proposed in this pull request? To support revising lost shuffle IDs in a long-running job such as flink batch jobs. ### Why are the changes needed? 1. To support revise lost shuffles. 2. To add an HTTP endpoint to revise lost shuffles manually. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? Cluster tests. Closes #2746 from FMX/b1600. Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: SteNicholas <programgeek@163.com>

Commit:d14e9bb
Author:Sanskar Modi
Committer:mingji

[CELEBORN-1620][CIP-11] Support passing worker tags via RequestSlots message ### What changes were proposed in this pull request? Supporting passing tag expression in RequestSlots request. Clients can pass the tags using CelebornConf. Default tag configs for system/tenant/user will be suppoted in follow up PRs. ### Why are the changes needed? https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UTs passed, will add more UTs while integrating TagsManager with ConfigService. Closes #2770 from s0nskar/request-slots. Authored-by: Sanskar Modi <sanskarmodi97@gmail.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>

Commit:6629be8
Author:szt
Committer:Shuang

[CELEBORN-1574] Speed up unregister shuffle by batch processing ### What changes were proposed in this pull request? In order to speed up the resource releasing,this PR Unregister shuffle in batch; ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT & Local cluster testing Closes #2701 from zaynt4606/batchUnregister. Lead-authored-by: szt <zaynt4606@163.com> Co-authored-by: Zaynt <shuaizhentao.szt@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:5d61458
Author:Weijie Guo
Committer:Shuang

[CELEBORN-1490][CIP-6] Extends message to support hybrid shuffle ### What changes were proposed in this pull request? This is the first PR to support Hybrid Shuffle. Extends message to support hybrid shuffle. ### Why are the changes needed? hybrid shuffle is a tiered storage architecture, which introduces the concept of `segment`. One segment's data selects a tier to send. Data is split into segments and sent to multiple tiers. This PR introduces segment-related message. In addition, hybrid shuffle needs to distinguish which subpartition it comes from when consuming data, so we need to extend the `SubpartitionId` field to `ReadData` (new class introduced for compatibility). ### Does this PR introduce _any_ user-facing change? no. ### How was this patch tested? no need. Closes #2714 from reswqa/cip6-1-extend-message. Authored-by: Weijie Guo <reswqa@163.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:34c6aea
Author:SteNicholas
Committer:mingji

[CELEBORN-1557] Fix totalSpace of DiskInfo for Master in HA mode ### What changes were proposed in this pull request? Fix `totalSpace` of `DiskInfo` for Master in HA mode. ### Why are the changes needed? The `totalSpace` of `DiskInfo` does not sync for Master in HA mode, which causes that the `totalSpace` is incorrect. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `RatisMasterStatusSystemSuiteJ#testHandleRegisterWorker` Closes #2690 from SteNicholas/CELEBORN-1557. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com> (cherry picked from commit b330b550ba28e1b1d7fbe50993afd8ce5aa1fac8) Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>

Commit:b330b55
Author:SteNicholas
Committer:mingji

[CELEBORN-1557] Fix totalSpace of DiskInfo for Master in HA mode ### What changes were proposed in this pull request? Fix `totalSpace` of `DiskInfo` for Master in HA mode. ### Why are the changes needed? The `totalSpace` of `DiskInfo` does not sync for Master in HA mode, which causes that the `totalSpace` is incorrect. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `RatisMasterStatusSystemSuiteJ#testHandleRegisterWorker` Closes #2690 from SteNicholas/CELEBORN-1557. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>

Commit:ae41cb5
Author:Wang, Fei
Committer:Shuang

[CELEBORN-1537] Support to remove workers unavailable info with RESTful api ### What changes were proposed in this pull request? In [CELEBORN-1535](https://issues.apache.org/jira/browse/CELEBORN-1535), we support to disable master workerUnavilableInfo expiration. In this PR, a new RestAPI introduced for manually remove unavailable workers. Then it can be used on demand. ### Why are the changes needed? To cleanup the works unavailable info on demand manually if we disable the expiration. ### Does this PR introduce _any_ user-facing change? Yes, a new RESTful API. ### How was this patch tested? UT. Closes #2658 from turboFei/support_cleanup. Authored-by: Wang, Fei <fwang12@ebay.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:74423fb
Author:Aravind Patnam
Committer:mingji

[CELEBORN-1549] Fix networkLocation persistence into Ratis ### What changes were proposed in this pull request? Fixing a bug where the `networkLocation` is not persisted in Ratis, and the master defaults to `DEFAULT_RACK` when it loads the snapshot. This was missed in https://github.com/apache/celeborn/pull/2367 unfortunately, and it came up during our stress testing internally. ### Why are the changes needed? Needed for custom network aware replication, so that networkLocation state is kept in snapshot file. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated unit test to ensure serde is correct. Closes #2669 from akpatnam25/CELEBORN-1549. Authored-by: Aravind Patnam <apatnam@linkedin.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>

Commit:176ef7a
Author:Mridul Muralidharan
Committer:zky.zhoukeyong

[CELEBORN-1518] Add support for Apache Spark barrier stages Adds support for barrier stages. This involves two aspects: a) If there is a task failure when executing a barrier stage, all shuffle output for the stage attempt are discarded and ignored. b) If there is a reexecution of a barrier stage (for ex, due to child stage getting a fetch failure), all shuffle output for the previous stage attempt are discarded and ignored. This is similar to handling of indeterminate stages when `throwsFetchFailure` is `true`. Note that this is supported only when `spark.celeborn.client.spark.fetch.throwsFetchFailure` is `true` As detailed in CELEBORN-1518, Celeborn currently does not support barrier stages; which is an essential functionality in Apache Spark which is widely in use by Spark users. Enhancing Celeborn will allow its use for a wider set of Spark users. Adds ability for Celeborn to support Apache Spark Barrier stages. Existing tests, and additional tests (thanks to jiang13021 in #2609 - [see here](https://github.com/apache/celeborn/pull/2609/files#diff-e17f15fcca26ddfc412f0af159c784d72417b0f22598e1b1ebfcacd6d4c3ad35)) Closes #2639 from mridulm/fix-barrier-stage-reexecution. Lead-authored-by: Mridul Muralidharan <mridul@gmail.com> Co-authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit 3234bef81bab9aabd7fdaace3fbc1361832c9b61) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:3234bef
Author:Mridul Muralidharan
Committer:zky.zhoukeyong

[CELEBORN-1518] Add support for Apache Spark barrier stages ### What changes were proposed in this pull request? Adds support for barrier stages. This involves two aspects: a) If there is a task failure when executing a barrier stage, all shuffle output for the stage attempt are discarded and ignored. b) If there is a reexecution of a barrier stage (for ex, due to child stage getting a fetch failure), all shuffle output for the previous stage attempt are discarded and ignored. This is similar to handling of indeterminate stages when `throwsFetchFailure` is `true`. Note that this is supported only when `spark.celeborn.client.spark.fetch.throwsFetchFailure` is `true` ### Why are the changes needed? As detailed in CELEBORN-1518, Celeborn currently does not support barrier stages; which is an essential functionality in Apache Spark which is widely in use by Spark users. Enhancing Celeborn will allow its use for a wider set of Spark users. ### Does this PR introduce _any_ user-facing change? Adds ability for Celeborn to support Apache Spark Barrier stages. ### How was this patch tested? Existing tests, and additional tests (thanks to jiang13021 in #2609 - [see here](https://github.com/apache/celeborn/pull/2609/files#diff-e17f15fcca26ddfc412f0af159c784d72417b0f22598e1b1ebfcacd6d4c3ad35)) Closes #2639 from mridulm/fix-barrier-stage-reexecution. Lead-authored-by: Mridul Muralidharan <mridul@gmail.com> Co-authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:7a596bb
Author:zhaohehuhu
Committer:mingji

[CELEBORN-1469] Support writing shuffle data to OSS(S3 only) ### What changes were proposed in this pull request? as title ### Why are the changes needed? Now, Celeborn doesn't support sinking shuffle data directly to Amazon S3, which could be a limitation when we're trying to move on-premises servers to AWS and use S3 as a data sink for shuffled data. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Closes #2579 from zhaohehuhu/dev-0619. Authored-by: zhaohehuhu <luoyedeyi@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>

Commit:999510b
Author:Xianming Lei
Committer:zky.zhoukeyong

[CELEBORN-1444] Introduce worker decommission metrics and corresponding REST API ### What changes were proposed in this pull request? Introduce worker decommission metrics and corresponding REST API. ### Why are the changes needed? In a production environment, due to certain hardware or environmental reasons, our script will automatically decommission the node. At this time, we need to distinguish between graceful shutdown nodes and decommissioned nodes. If we distinguish shutdown worker and decommission worker metrics, we can achieve better operation and maintenance. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? - `DefaultMetaSystemSuiteJ#testHandleReportWorkerDecommission` - `RatisMasterStatusSystemSuiteJ#testHandleReportWorkerDecommission` - `ApiMasterResourceSuite#decommissionWorkers` - `ApiWorkerResourceSuite#isDecommissioning` Closes #2535 from leixm/issue_1444. Lead-authored-by: Xianming Lei <jerrylei@apache.org> Co-authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:308eed2
Author:Shuang
Committer:SteNicholas

[CELEBORN-1427] Add Capacity metrics for Celeborn ### What changes were proposed in this pull request? As title ### Why are the changes needed? The Celeborn cluster does not currently provide metrics for 'TotalCapacity' and 'TotalFreeCapacity ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA Closes #2521 from RexXiong/CELEBORN-1427. Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com> Signed-off-by: SteNicholas <programgeek@163.com>

Commit:8dd33ce
Author:mingji
Committer:Shuang

[CELEBORN-1270] Introduce PbPackedPartitionLocations to (de-)serialize PartitionLocations more efficiently ### What changes were proposed in this pull request? 1. Introduces new approaches to (de-)serialize partition locations. 2. The Celeborn server remains compatible with old clients. ### Why are the changes needed? 1. Improve memory efficiency for partition locations. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? 1. Pass GA. 2. Run tests on cluster: ``` val start = System.currentTimeMillis spark.sparkContext.parallelize(1 to 10000, 10000).flatMap( _ => (1 to 950000).iterator.map(num => num)).repartition(10000).count val after = System.currentTimeMillis println((after-start)/1000) ``` packed RPC time: 70,65,64,64,64,64 baseline RPC time: 69,66,66,66,67,66 I think this PR does not introduce performance overhead. 4. RPC size test: this PR can reduce PRC size by up to 60%. Closes #2456 from FMX/CELEBORN-1270. Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:fc23800
Author:zky.zhoukeyong
Committer:Shuang

[CELEBORN-1144] Batch OpenStream RPCs ### What changes were proposed in this pull request? Batch OpenStream RPCs by Worker to avoid too many RPCs. ### Why are the changes needed? ditto ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Passes GA and Manual tests. Closes #2362 from waitinfuture/1144. Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:ca64bb5
Author:Aravind Patnam
Committer:zky.zhoukeyong

[CELEBORN-1313] Custom Network Location Aware Replication ### What changes were proposed in this pull request? Enable custom network location aware replication, based on a custom impl of `DNSToSwitchMapping`. ### Why are the changes needed? Resolution of network location of multiple workers at master can be expensive at times. This way, each worker resolves its own network location and sends to master via the RegisterWorker transport message. If worker cannot resolve, fallback to attempting to resolve at master (during update meta or reload of snapshot). Proposal: [Celeborn Custom Network Location Aware Replication](https://docs.google.com/document/d/11M_MKKnIXCTExJHMX-OMTq7SBpkl8fJMlpy8hLgmev0/edit#heading=h.s3vnydz589z5) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated the unit tests. Closes #2367 from akpatnam25/CELEBORN-1313. Authored-by: Aravind Patnam <apatnam@linkedin.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:835437f
Author:Chandni Singh
Committer:zky.zhoukeyong

[CELEBORN-1261] Add auth support to client ### What changes were proposed in this pull request? This enables client to push and fetch shuffle data securely to Celeborn Workers. ### Why are the changes needed? This change is required for adding authentication. ([CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011)). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? It is part of bigger change which will be tested end to end. Closes #2360 from otterc/CELEBORN-1261. Authored-by: Chandni Singh <singh.chandni@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:6897b8b
Author:Chandni Singh
Committer:SteNicholas

[CELEBORN-1234] Master should persist the application meta in Ratis and push it to the Workers ### What changes were proposed in this pull request? This enables Celeborn Master to persist application meta in Ratis and also push it to Celeborn Workers when it receives the requests for slots from the LifecycleManager. ### Why are the changes needed? This change is required for adding authentication. ([CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011)). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added some UTs. Closes #2346 from otterc/CELEBORN-1234. Authored-by: Chandni Singh <singh.chandni@gmail.com> Signed-off-by: SteNicholas <programgeek@163.com>

Commit:d5a1bcd
Author:Chandni Singh
Committer:waitinfuture

[CELEBORN-1256] Added internal port and auth support to Celeborn worker ### What changes were proposed in this pull request? This adds an internal port and auth support to Celeborn Wokers. 1. Internal port is used by a worker to receive messages from Celeborn Master. 2. Authentication support for secure communication with clients. This change doesn't add the support in clients to communicate to the Workers securely. That will be in a future change. This change targets just adding the port and auth support to Worker. The following items from the proposal are still pending: - Persisting the app secrets in Ratis. - Forwarding secrets to Workers and having ability for the workers to pull registration info from the Master. - Secured communication between workers and clients. ### Why are the changes needed? It is needed for adding authentication support to Celeborn ([CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011)) ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Part of a bigger change. For this change, only modified existing UTs. Closes #2292 from otterc/CELEBORN-1256. Authored-by: Chandni Singh <singh.chandni@gmail.com> Signed-off-by: waitinfuture <zky.zhoukeyong@alibaba-inc.com>

Commit:f7c8c9d
Author:SteNicholas

[CELEBORN-1174][0.4] Introduce application dimension resource consumption metrics ### What changes were proposed in this pull request? Cherry pick #2161. Introduce application dimension resource consumption metrics for `ResourceConsumptionSource`. ### Why are the changes needed? `ResourceConsumption` namespace metrics are generated for each user and they are identified using a metric tag at present. It's recommended to introduce application dimension resource consumption metrics that expose application dimension resource consumption of Master and Worker. By monitoring resource consumption in the application dimension, you can obtain the actual situation of application resource consumption. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `WorkerInfoSuite#WorkerInfo toString output` - `PbSerDeUtilsTest#fromAndToPbResourceConsumption` - `MasterStateMachineSuitej#testObjSerde` Closes #2279 from SteNicholas/CELEBORN-1174-0.4. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: SteNicholas <programgeek@163.com>

Commit:05fa11b
Author:SteNicholas

[CELEBORN-1174] Introduce application dimension resource consumption metrics ### What changes were proposed in this pull request? Introduce application dimension resource consumption metrics for `ResourceConsumptionSource`. ### Why are the changes needed? `ResourceConsumption` namespace metrics are generated for each user and they are identified using a metric tag at present. It's recommended to introduce application dimension resource consumption metrics that expose application dimension resource consumption of Master and Worker. By monitoring resource consumption in the application dimension, you can obtain the actual situation of application resource consumption. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `WorkerInfoSuite#WorkerInfo toString output` - `PbSerDeUtilsTest#fromAndToPbResourceConsumption` - `MasterStateMachineSuitej#testObjSerde` Closes #2161 from SteNicholas/CELEBORN-1174. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: SteNicholas <programgeek@163.com>

Commit:e71d912
Author:Shuang

[CELEBORN-1245] Support Celeborn Master(Leader) to manage workers ### What changes were proposed in this pull request? 1. Support Celeborn Master(Leader) to manage workers by sending event when heartbeat 2. Add Worker Status to Worker then we can know the status of the workers(such as during decommission...) 3. Add Http interface for master to handleWorkerEvent/getWorkerEvent ### Why are the changes needed? Currently, we only support managing the status of workers on the worker side. This pr supports the master to manage the status of all workers. By sending events such as (Decommission/Graceful/Exit) when heartbeat, workers can be asynchronously execute the command from master. MeanWhile we can't know what the worker status during worker decommission so this pr add worker status to tell the exactly status of the worker. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GA Closes #2255 from RexXiong/CELEBORN-1245. Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>

Commit:a86a315
Author:Chandni Singh
Committer:zky.zhoukeyong

[CELEBORN-1229] Support for application registration with Celeborn Master ### What changes were proposed in this pull request? This adds support for applications to register with Celeborn Master by introducing the `RegistrationClientBootstrap`, `RegistrationServerBootstrap`, and `RegistrationRpcHandler` classes, which facilitate the client connection setup with the Celeborn Master. The registration protocol details are described in the [auth proposal](https://docs.google.com/document/d/1D1U2COYhS3ob7l0t2WghRhBk_Fci9RGx-2FBXA3nvXk/edit#heading=h.po9dc3r1kb3k). ### Why are the changes needed? The changes are needed for adding authentication to Celeborn. See [CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011). ### Does this PR introduce _any_ user-facing change? Add the config `celeborn.auth.enabled` ### How was this patch tested? Added UTs. Closes #2231 from otterc/CELEBORN-1229. Authored-by: Chandni Singh <singh.chandni@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:7be05b4
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-1133] Refactor fileinfo ### What changes were proposed in this pull request? Rename FileWriter to PartitionLocationDataWriter, add storageManager, delete fileinfo, and flusher in the constructor. FileInfo(userIdentifier,partitionSplitEnabled,fileMeta) – NonMemoryFileInfo(streams,filePath,storageType,bytesFlushed) – MemoryFileInfo(length,buffer) FileMeta – reduceFileMeta(chunkOffsets,sorted) – mapFileMeta(bufferSize,numSubPartitions) ### Why are the changes needed? 1. To make concepts more clear. 2. To support memory storage and HDFS slot management. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? GA and cluster test with worker kill. Closes #2130 from FMX/b1133. Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:90b4f2b
Author:zky.zhoukeyong
Committer:zky.zhoukeyong

[CELEBORN-1175] Add UT for commit files ### What changes were proposed in this pull request? As title. ### Why are the changes needed? As title. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Passes UTs. Closes #2162 from waitinfuture/1175-2. Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit 309153a99be238a0faa4e8e0193b2814046a5d4a) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:309153a
Author:zky.zhoukeyong

[CELEBORN-1175] Add UT for commit files ### What changes were proposed in this pull request? As title. ### Why are the changes needed? As title. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Passes UTs. Closes #2162 from waitinfuture/1175-2. Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:980c48c
Author:zky.zhoukeyong
Committer:zky.zhoukeyong

[CELEBORN-1167] Avoid calling parmap when destroy slots ### What changes were proposed in this pull request? As title ### Why are the changes needed? One user reported that LifecycleManager's parmap can create huge number of threads and causes OOM. ![image](https://github.com/apache/incubator-celeborn/assets/948245/1e9a0b83-32fe-40d5-8739-2b370e030fc8) There are four places where parmap is called: 1. When LifecycleManager commits files 2. When LifecycleManager reserves slots 3. When LifecycleManager setup connection to workers 4. When LifecycleManager call destroy slots This PR fixes the fourth one. To be more detail, this PR eliminates `parmap` when destroying slots, and also replaces `askSync` with `ask`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual test and GA. Closes #2156 from waitinfuture/1167. Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Co-authored-by: cxzl25 <cxzl25@users.noreply.github.com> Co-authored-by: Keyong Zhou <waitinfuture@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit 01feb93abbd01a2721b4f7c64ace75123e6bf7e3) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:01feb93
Author:zky.zhoukeyong

[CELEBORN-1167] Avoid calling parmap when destroy slots ### What changes were proposed in this pull request? As title ### Why are the changes needed? One user reported that LifecycleManager's parmap can create huge number of threads and causes OOM. ![image](https://github.com/apache/incubator-celeborn/assets/948245/1e9a0b83-32fe-40d5-8739-2b370e030fc8) There are four places where parmap is called: 1. When LifecycleManager commits files 2. When LifecycleManager reserves slots 3. When LifecycleManager setup connection to workers 4. When LifecycleManager call destroy slots This PR fixes the fourth one. To be more detail, this PR eliminates `parmap` when destroying slots, and also replaces `askSync` with `ask`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual test and GA. Closes #2156 from waitinfuture/1167. Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Co-authored-by: cxzl25 <cxzl25@users.noreply.github.com> Co-authored-by: Keyong Zhou <waitinfuture@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:a03ce6c
Author:Chandni Singh
Committer:zky.zhoukeyong

[CELEBORN-1157] Add client-side support for Sasl Authentication in the transport layer ### What changes were proposed in this pull request? This adds the client side Sasl authentication support in the transport layer. Most of this code is taken from Apache Spark. ### Why are the changes needed? The changes are needed for adding authentication to Celeborn. See [CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011). ### Does this PR introduce _any_ user-facing change? Added a configuration for Sasl request timeout ### How was this patch tested? Will be adding `CelebornSaslSuiteJ.java` (https://github.com/apache/incubator-celeborn/pull/2105) that tests the end-to-end Sasl flow. Closes #2139 from otterc/CELEBORN-1157. Authored-by: Chandni Singh <singh.chandni@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:8516df4
Author:wangshengjie
Committer:zky.zhoukeyong

[CELEBORN-1151] Request slots when register shuffle should filter the workers excluded by application ### What changes were proposed in this pull request? When request slots, filter workers excluded by application ### Why are the changes needed? If worker alive but can not service, register shuffle will remove the worker from application client exclude list and next shuffle may reserve slots on this worker,this will cause application revive unexpectly ### Does this PR introduce _any_ user-facing change? Yes, request slots will filter workers excluded by application ### How was this patch tested? UT, Closes #2131 from wangshengjie123/fix-request-slots-blacklist. Authored-by: wangshengjie <wangshengjie3@xiaomi.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:113311d
Author:mingji
Committer:Shuang

[CELEBORN-1081][FOLLOWUP] Remove UNKNOWN_DISK and allocate all slots to disk ### What changes were proposed in this pull request? 1. Remove UNKNOWN_DISK from StorageInfo. 2. Enable load-aware slots allocation when there is HDFS. ### Why are the changes needed? To support the application's config about available storage types. ### Does this PR introduce _any_ user-facing change? no. ### How was this patch tested? GA and Cluster. Closes #2098 from FMX/B1081-1. Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.tb@gmail.com>

Commit:aee4155
Author:Erik.fang
Committer:zky.zhoukeyong

[CELEBORN-955] Re-run Spark Stage for Celeborn Shuffle Fetch Failure ### What changes were proposed in this pull request? Currently, Celeborn uses replication to handle shuffle data lost for celeborn shuffle reader, this PR implements an alternative solution by Spark stage resubmission. Design doc: https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8/edit ### Why are the changes needed? Spark stage resubmission uses less resources compared with replication, and some Celeborn users are also asking for it ### Does this PR introduce _any_ user-facing change? a new config celeborn.client.fetch.throwsFetchFailure is introduced to enable this feature ### How was this patch tested? two UTs are attached, and we also tested it in Ant Group's Dev spark cluster Closes #1924 from ErikFang/Re-run-Spark-Stage-for-Celeborn-Shuffle-Fetch-Failure. Lead-authored-by: Erik.fang <fmerik@gmail.com> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:52eddc5
Author:SteNicholas
Committer:mingji

[CELEBORN-448] Support exclude worker manually ### What changes were proposed in this pull request? Support exclude worker manually given worker id. This worker is added into excluded workers manually. ### Why are the changes needed? Celeborn supports to shuffle client-side fetch and push exclude workers on failure at present. It's necessary to exclude worker manually for maintaining the Celeborn cluster. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `HttpUtilsSuite` - `DefaultMetaSystemSuiteJ#testHandleWorkerExclude` - `RatisMasterStatusSystemSuiteJ#testHandleWorkerExclude` - `MasterStateMachineSuiteJ#testObjSerde` Closes #1997 from SteNicholas/CELEBORN-448. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>

Commit:9c87bd4
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-1081] Client support `celeborn.storage.activeTypes` config 1.To support `celeborn.storage.activeTypes` in Client. 2.Master will ignore slots for "UNKNOWN_DISK". Enable client application to select storage types to use. Yes. GA and cluster. Closes #2045 from FMX/B1081. Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.tb@gmail.com>

Commit:5e77b85
Author:mingji
Committer:Shuang

[CELEBORN-1081] Client support `celeborn.storage.activeTypes` config ### What changes were proposed in this pull request? 1.To support `celeborn.storage.activeTypes` in Client. 2.Master will ignore slots for "UNKNOWN_DISK". ### Why are the changes needed? Enable client application to select storage types to use. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? GA and cluster. Closes #2045 from FMX/B1081. Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.tb@gmail.com>

Commit:9244cf2
Author:SteNicholas
Committer:Shuang

[CELEBORN-772] Convert StreamChunkSlice, ChunkFetchRequest, TransportableError to PB ### What changes were proposed in this pull request? `StreamChunkSlice`, `ChunkFetchRequest` and `TransportableError` should merge to transport messages to enhance celeborn's compatibility. ### Why are the changes needed? 1. Improves celeborn's transport flexibility to change RPC. 2. Makes Compatible with 0.2 client. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `FetchHandlerSuiteJ` - `RequestTimeoutIntegrationSuiteJ` - `ChunkFetchIntegrationSuiteJ` Closes #1982 from SteNicholas/CELEBORN-772. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Shuang <lvshuang.tb@gmail.com>

Commit:c4135dc
Author:Fu Chen
Committer:zky.zhoukeyong

[CELEBORN-980] Asynchronously delete original files to fix `ReusedExchange` bug ### What changes were proposed in this pull request? The `ReusedExchange` operator has the potential to generate different types of fetch requests, including both non-range and range requests. Currently, an issue arises due to the synchronous deletion of the original file by the Celeborn worker upon completion of sorting. This issue leads to the failure of non-range requests following a range request for the same partition. the snippets to reproduce this bug ```scala val sparkConf = new SparkConf().setAppName("celeborn-test").setMaster("local[2]") .set("spark.shuffle.manager", "org.apache.spark.shuffle.celeborn.SparkShuffleManager") .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", masterInfo._1.rpcEnv.address.toString) .set("spark.sql.autoBroadcastJoinThreshold", "-1") .set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "100") .set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "100") val spark = SparkSession.builder() .config(sparkConf) .getOrCreate() spark.range(0, 1000, 1, 10) .selectExpr("id as k1", "id as v1") .createOrReplaceTempView("ta") spark.range(0, 1000, 1, 10) .selectExpr("id % 1 as k21", "id % 1 as k22", "id as v2") .createOrReplaceTempView("tb") spark.range(140) .select( col("id").cast("long").as("k3"), concat(col("id").cast("string"), lit("a")).as("v3")) .createOrReplaceTempView("tc") spark.sql( """ |SELECT * |FROM ta |LEFT JOIN tb ON ta.k1 = tb.k21 |LEFT JOIN tc ON tb.k22 = tc.k3 |""".stripMargin) .createOrReplaceTempView("v1") spark.sql( """ |SELECT * FROM v1 WHERE v3 IS NOT NULL |UNION |SELECT * FROM v1 |""".stripMargin) .collect() ``` This PR proposes a solution to address this problem. It introduces an asynchronous thread for the removal of the original file. Once the sorted file is generated for a given partition, this modification ensures that both non-range and range fetch requests will be able to and only fetch the sorted file once it is generated for a given partition. this activity diagram of `openStream` ![openStream](https://github.com/apache/incubator-celeborn/assets/8537877/633cc5b8-e673-45a0-860e-e1f7e50c8965) ### Does this PR introduce _any_ user-facing change? No, only bug fix ### How was this patch tested? UT Closes #1932 from cfmcgrady/fix-partition-sort-bug-v4. Authored-by: Fu Chen <cfmcgrady@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:dd05933
Author:SteNicholas
Committer:Cheng Pan

[CELEBORN-771][FLINK] Convert PushDataHandShake, RegionFinish, RegionStart to PB ### What changes were proposed in this pull request? `PushDataHandShake`, `RegionFinish`, and `RegionStart` should merge to transport messages to enhance celeborn's compatibility. ### Why are the changes needed? 1. Improves celeborn's transport flexibility to change RPC. 2. Makes Compatible with 0.2 client. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `RemoteShuffleOutputGateSuiteJ` Closes #1910 from SteNicholas/CELEBORN-771. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com> (cherry picked from commit 55e85055b80111b6f3662474c8fd500bf6589f2a)

Commit:00251ac
Author:SteNicholas
Committer:Shuang

[CELEBORN-770][FLINK] Convert BacklogAnnouncement, BufferStreamEnd, ReadAddCredit to PB `BacklogAnnouncement`, `BufferStreamEnd`, and `ReadAddCredit` should merge to transport messages to enhance celeborn's compatibility. 1. Improves celeborn's transport flexibility to change RPC. 2. Makes Compatible with 0.2 client. No. - `TransportFrameDecoderWithBufferSupplierSuiteJ` Closes #1905 from SteNicholas/CELEBORN-770. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Shuang <lvshuang.tb@gmail.com> (cherry picked from commit 2407cae43ab44b6d7a7394736e7c12cbbd51ebb5) Signed-off-by: Shuang <lvshuang.tb@gmail.com>

Commit:2407cae
Author:SteNicholas
Committer:Shuang

[CELEBORN-770][FLINK] Convert BacklogAnnouncement, BufferStreamEnd, ReadAddCredit to PB ### What changes were proposed in this pull request? `BacklogAnnouncement`, `BufferStreamEnd`, and `ReadAddCredit` should merge to transport messages to enhance celeborn's compatibility. ### Why are the changes needed? 1. Improves celeborn's transport flexibility to change RPC. 2. Makes Compatible with 0.2 client. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `TransportFrameDecoderWithBufferSupplierSuiteJ` Closes #1905 from SteNicholas/CELEBORN-770. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Shuang <lvshuang.tb@gmail.com>

Commit:55e8505
Author:SteNicholas
Committer:mingji

[CELEBORN-771][FLINK] Convert PushDataHandShake, RegionFinish, RegionStart to PB ### What changes were proposed in this pull request? `PushDataHandShake`, `RegionFinish`, and `RegionStart` should merge to transport messages to enhance celeborn's compatibility. ### Why are the changes needed? 1. Improves celeborn's transport flexibility to change RPC. 2. Makes Compatible with 0.2 client. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `RemoteShuffleOutputGateSuiteJ` Closes #1910 from SteNicholas/CELEBORN-771. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>

Commit:2d58dcd
Author:Shuang
Committer:zky.zhoukeyong

[CELEBORN-468] Timeout useless lostWorkers/shutdownWorkers meta ### What changes were proposed in this pull request? As title ### Why are the changes needed? If Worker lost or lost after graceful shutdown, Master would retain these lostWorker/shutdownWorkers meta permanently, These meta would cause some noisy message in lifecycleManager. For these meta better to delete them after a while ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT & E2E test Closes #1916 from RexXiong/CELEBORN-468. Authored-by: Shuang <lvshuang.tb@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit 615479c44289bd79c83774eabf99760db5d53e97) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:615479c
Author:Shuang
Committer:zky.zhoukeyong

[CELEBORN-468] Timeout useless lostWorkers/shutdownWorkers meta ### What changes were proposed in this pull request? As title ### Why are the changes needed? If Worker lost or lost after graceful shutdown, Master would retain these lostWorker/shutdownWorkers meta permanently, These meta would cause some noisy message in lifecycleManager. For these meta better to delete them after a while ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT & E2E test Closes #1916 from RexXiong/CELEBORN-468. Authored-by: Shuang <lvshuang.tb@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:8f000d2
Author:zhongqiang.czq
Committer:zhongqiang.czq

[CELEBORN-627][FLINK] Support split partitions ### What changes were proposed in this pull request? In MapPartiitoin, datas are split into regions. 1. Unlike ReducePartition whose partition split can occur on pushing data to keep MapPartition data ordering, PartitionSplit only be done on the time of sending PushDataHandShake or RegionStart messages (As shown in the following image). That's to say that the partition split only appear at the beginnig of a region but not inner a region. > Notice: if the client side think that it's failed to push HandShake or RegionStart messages. but the worker side can still receive normal HandShake/RegionStart message. After client revive succss, it don't push any messages to old partition, so the worker having the old partition will create a empty file. After committing files, the worker will return empty commitids. That's to say that empty file will be filterd after committing files and ReduceTask will not read any empty files. ![image](https://github.com/apache/incubator-celeborn/assets/96606293/468fd660-afbc-42c1-b111-6643f5c1e944) 2. PushData/RegioinFinish don't care the following cases: - Diskfull - ExceedPartitionSplitThreshold - Worker ShuttingDown so if one of the above three conditions appears, PushData and RegionFinish cant still do as normal. Workers should consider the ShuttingDown case and try best to wait all the regions finished before shutting down. if PushData or RegionFinish failed like network timeout and so on, then MapTask will failed and start another attempte maptask. ![image](https://github.com/apache/incubator-celeborn/assets/96606293/db9f9166-2085-4be1-b09e-cf73b469c55b) 3. how shuffle read supports partition split? ReduceTask should get split paritions by order and open the stream by partition epoc orderly ### Why are the changes needed? PartiitonSplit is not supported by MapPartition from now. There still a risk that a partition file'size is too large to store the file on worker disk. To avoid this risk, this pr introduces partition split in shuffle read and shuffle write. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT and manual TPCDS test Closes #1550 from FMX/CELEBORN-627. Lead-authored-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com> Co-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Ethan Feng <ethanfeng@apache.org> Signed-off-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com> (cherry picked from commit b66eaff880e91864a21c96dbac94fa5f8cd84f4c) Signed-off-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com>

Commit:b66eaff
Author:zhongqiang.czq

[CELEBORN-627][FLINK] Support split partitions ### What changes were proposed in this pull request? In MapPartiitoin, datas are split into regions. 1. Unlike ReducePartition whose partition split can occur on pushing data to keep MapPartition data ordering, PartitionSplit only be done on the time of sending PushDataHandShake or RegionStart messages (As shown in the following image). That's to say that the partition split only appear at the beginnig of a region but not inner a region. > Notice: if the client side think that it's failed to push HandShake or RegionStart messages. but the worker side can still receive normal HandShake/RegionStart message. After client revive succss, it don't push any messages to old partition, so the worker having the old partition will create a empty file. After committing files, the worker will return empty commitids. That's to say that empty file will be filterd after committing files and ReduceTask will not read any empty files. ![image](https://github.com/apache/incubator-celeborn/assets/96606293/468fd660-afbc-42c1-b111-6643f5c1e944) 2. PushData/RegioinFinish don't care the following cases: - Diskfull - ExceedPartitionSplitThreshold - Worker ShuttingDown so if one of the above three conditions appears, PushData and RegionFinish cant still do as normal. Workers should consider the ShuttingDown case and try best to wait all the regions finished before shutting down. if PushData or RegionFinish failed like network timeout and so on, then MapTask will failed and start another attempte maptask. ![image](https://github.com/apache/incubator-celeborn/assets/96606293/db9f9166-2085-4be1-b09e-cf73b469c55b) 3. how shuffle read supports partition split? ReduceTask should get split paritions by order and open the stream by partition epoc orderly ### Why are the changes needed? PartiitonSplit is not supported by MapPartition from now. There still a risk that a partition file'size is too large to store the file on worker disk. To avoid this risk, this pr introduces partition split in shuffle read and shuffle write. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT and manual TPCDS test Closes #1550 from FMX/CELEBORN-627. Lead-authored-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com> Co-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Ethan Feng <ethanfeng@apache.org> Signed-off-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com>

Commit:cd0aa53
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-752] Support read local shuffle file for spark ### What changes were proposed in this pull request? For spark clusters, support read local shuffle file if Celeborn is co-deployed with yarn node managers. This PR help to reduce the number of active connections. ### Why are the changes needed? Ditto. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? GA and cluster. The performance is identical whether you enable local reader, but the active connection number may vary according to your connections per peer. <img width="951" alt="截屏2023-08-16 20 20 14" src="https://github.com/apache/incubator-celeborn/assets/4150993/9106e731-28fc-4e78-9c05-ae6a269d249a"> The active connection number changed from 3745 to 2894. This PR will help to improve cluster stability. Closes #1812 from FMX/CELEBORN-752. Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit 505ba804c7d7890922e09aaacd2b155c70328a35) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:505ba80
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-752] Support read local shuffle file for spark ### What changes were proposed in this pull request? For spark clusters, support read local shuffle file if Celeborn is co-deployed with yarn node managers. This PR help to reduce the number of active connections. ### Why are the changes needed? Ditto. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? GA and cluster. The performance is identical whether you enable local reader, but the active connection number may vary according to your connections per peer. <img width="951" alt="截屏2023-08-16 20 20 14" src="https://github.com/apache/incubator-celeborn/assets/4150993/9106e731-28fc-4e78-9c05-ae6a269d249a"> The active connection number changed from 3745 to 2894. This PR will help to improve cluster stability. Closes #1812 from FMX/CELEBORN-752. Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:d39aab2
Author:SteNicholas
Committer:zky.zhoukeyong

[CELEBORN-830] Check available workers in CelebornShuffleFallbackPolicyRunner ### What changes were proposed in this pull request? `CelebornShuffleFallbackPolicyRunner` could not only check quota, but also check whether cluster has available workers. If there is no available workers, fallback to external shuffle. ### Why are the changes needed? `CelebornShuffleFallbackPolicyRunner` adds a check for available workers. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `SparkShuffleManagerSuite#testClusterNotAvailableWithAvailableWorkers` Closes #1814 from SteNicholas/CELEBORN-830. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:4625484
Author:SteNicholas
Committer:zky.zhoukeyong

[CELEBORN-830] Check available workers in CelebornShuffleFallbackPolicyRunner ### What changes were proposed in this pull request? `CelebornShuffleFallbackPolicyRunner` could not only check quota, but also check whether cluster has available workers. If there is no available workers, fallback to external shuffle. ### Why are the changes needed? `CelebornShuffleFallbackPolicyRunner` adds a check for available workers. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `SparkShuffleManagerSuite#testClusterNotAvailableWithAvailableWorkers` Closes #1814 from SteNicholas/CELEBORN-830. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:3f77210
Author:Keyong Zhou
Committer:zky.zhoukeyong

[CELEBORN-920] Worker sends its load to Master through heartbeat ### What changes were proposed in this pull request? Adding a flag indicating high load in the worker's heartbeat allows the master to better schedule the workers ### Why are the changes needed? In our production environment, there is a node with abnormally high load, but the master is not aware of this situation. It assigned numerous jobs to this node, and as a result, the stability of these jobs has been affected. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #1840 from JQ-Cao/920. Lead-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com> Co-authored-by: caojiaqing <caojiaqing@bilibili.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit 1d04a2328988b81276a8ec3e7a5988de488efba7) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:1d04a23
Author:Keyong Zhou
Committer:zky.zhoukeyong

[CELEBORN-920] Worker sends its load to Master through heartbeat ### What changes were proposed in this pull request? Adding a flag indicating high load in the worker's heartbeat allows the master to better schedule the workers ### Why are the changes needed? In our production environment, there is a node with abnormally high load, but the master is not aware of this situation. It assigned numerous jobs to this node, and as a result, the stability of these jobs has been affected. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #1840 from JQ-Cao/920. Lead-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com> Co-authored-by: caojiaqing <caojiaqing@bilibili.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:089bc19
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-846][FOLLOWUP] Fix broken link caused by unknown RPC ### What changes were proposed in this pull request? Keep ReleaseSlots RPC to make sure that 0.3 client can worker with 0.3.1-SNAPSHOT and 0.4.0-SNAPSHOT. This PR will need to merged into main and branch-0.3. ### Why are the changes needed? Ditto. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? GA and cluster. Closes #1794 from FMX/CELEBORN-846-FOLLOWUP. Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit 7d0e25700145c50557b9ccfa57ee7475ef1204e6) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:7d0e257
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-846][FOLLOWUP] Fix broken link caused by unknown RPC ### What changes were proposed in this pull request? Keep ReleaseSlots RPC to make sure that 0.3 client can worker with 0.3.1-SNAPSHOT and 0.4.0-SNAPSHOT. This PR will need to merged into main and branch-0.3. ### Why are the changes needed? Ditto. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? GA and cluster. Closes #1794 from FMX/CELEBORN-846-FOLLOWUP. Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:a1de276
Author:zky.zhoukeyong
Committer:zky.zhoukeyong

[CELEBORN-152] Add config to limit max workers when offering slots ### What changes were proposed in this pull request? Add config to limit max workers when offering slots, the config can be set both in server side and client side. Celeborn will choose the smaller positive configs from client and master. ### Why are the changes needed? For large Celeborn clusters, users may want to limit the number of workers that a shuffle can spread, reasons are: 1. One worker failure will not affect all applications 2. One huge shuffle will not affect all applications 3. It's more efficient to limit a shuffle within a restricted number of workers, say 100, than spreading across a large number of workers, say 1000, because the network connections in pushing data is `number of ShuffleClient` * `number of allocated Workers` The recommended number of Workers should depend on workload and Worker hardware, and this can be configured per application, so it's relatively flexible. ### Does this PR introduce _any_ user-facing change? No, added a new configuration. ### How was this patch tested? Added ITs and passes GA. Closes #1790 from waitinfuture/152. Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:7027594
Author:Angerszhuuuu
Committer:zky.zhoukeyong

[CELEBORN-846] Remove unused updateReleaseSlotsMeta in master side ### What changes were proposed in this pull request? As title ### Why are the changes needed? CELEBORN-791 removed sending the ReleaseSlotsRequest from worker, so Master is not required to handle it. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1767 from AngersZhuuuu/CELEBORN-846. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>

Commit:6ea1ee2
Author:zky.zhoukeyong

[CELEBORN-152] Add config to limit max workers when offering slots ### What changes were proposed in this pull request? Add config to limit max workers when offering slots, the config can be set both in server side and client side. Celeborn will choose the smaller positive configs from client and master. ### Why are the changes needed? For large Celeborn clusters, users may want to limit the number of workers that a shuffle can spread, reasons are: 1. One worker failure will not affect all applications 2. One huge shuffle will not affect all applications 3. It's more efficient to limit a shuffle within a restricted number of workers, say 100, than spreading across a large number of workers, say 1000, because the network connections in pushing data is `number of ShuffleClient` * `number of allocated Workers` The recommended number of Workers should depend on workload and Worker hardware, and this can be configured per application, so it's relatively flexible. ### Does this PR introduce _any_ user-facing change? No, added a new configuration. ### How was this patch tested? Added ITs and passes GA. Closes #1790 from waitinfuture/152. Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:e8d4c8e
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-760] Convert OpenStream and StreamHandler to Pb ### What changes were proposed in this pull request? Merge OpenStream and StreamHandler to transport messages to enhance celeborn's compatibility. ### Why are the changes needed? 1. Improve flexibility to change RPC. 2. Compatible with 0.2 client. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT and cluster. Closes #1750 from FMX/CELEBORN-760. Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Keyong Zhou <zhouky@apache.org> Co-authored-by: Keyong Zhou <waitinfuture@gmail.com> Co-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit ea39a9372aec1995c7bc38dcadc38a41127bc50b) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:ea39a93
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-760] Convert OpenStream and StreamHandler to Pb ### What changes were proposed in this pull request? Merge OpenStream and StreamHandler to transport messages to enhance celeborn's compatibility. ### Why are the changes needed? 1. Improve flexibility to change RPC. 2. Compatible with 0.2 client. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT and cluster. Closes #1750 from FMX/CELEBORN-760. Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Keyong Zhou <zhouky@apache.org> Co-authored-by: Keyong Zhou <waitinfuture@gmail.com> Co-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:e82a8e8
Author:Angerszhuuuu

[CELEBORN-846] Remove unused updateReleaseSlotsMeta in master side ### What changes were proposed in this pull request? As title ### Why are the changes needed? CELEBORN-791 removed sending the ReleaseSlotsRequest from worker, so Master is not required to handle it. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1767 from AngersZhuuuu/CELEBORN-846. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>

Commit:f10e6f0
Author:Cheng Pan

Revert "[CELEBORN-798] Add heartbeat from client to LifecycleManager to clean…" This reverts commit 20b60aba6af855ce3563e41a1227eb903b916146.

Commit:0db9194
Author:Cheng Pan

Revert "[CELEBORN-798] Add heartbeat from client to LifecycleManager to clean…" This reverts commit e56a8a8bed9c0c162b863ca3e08adb1731e4b7c1.

Commit:20b60ab
Author:zky.zhoukeyong
Committer:zky.zhoukeyong

[CELEBORN-798] Add heartbeat from client to LifecycleManager to clean… …up client ### What changes were proposed in this pull request? Add heartbeat from client to lifecycle manager. In this PR heartbeat request contains local shuffle ids from client, lifecycle manager checks with it's local set and returns ids it doesn't know. Upon receiving response, client calls ```unregisterShuffle``` for cleanup. ### Why are the changes needed? Before this PR, client side ```unregisterShuffle``` is never called. When running TPCDS 3T with spark thriftserver without DRA, I found the Executor's heap contains 1.6 million PartitionLocation objects (and StorageInfo): ![image](https://github.com/apache/incubator-celeborn/assets/948245/43658369-7763-4511-a5b0-9b3fbdf02005) After this PR, the number of PartitionLocation objects decreases to 275 thousands ![image](https://github.com/apache/incubator-celeborn/assets/948245/45f8f849-186d-4cad-83c8-64bd6d18debc) This heartbeat can be extended in the future for other purposes, i.e. reporting client's metrics. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Passes GA and manual test. Closes #1719 from waitinfuture/798. Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit e56a8a8bed9c0c162b863ca3e08adb1731e4b7c1) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:e56a8a8
Author:zky.zhoukeyong

[CELEBORN-798] Add heartbeat from client to LifecycleManager to clean… …up client ### What changes were proposed in this pull request? Add heartbeat from client to lifecycle manager. In this PR heartbeat request contains local shuffle ids from client, lifecycle manager checks with it's local set and returns ids it doesn't know. Upon receiving response, client calls ```unregisterShuffle``` for cleanup. ### Why are the changes needed? Before this PR, client side ```unregisterShuffle``` is never called. When running TPCDS 3T with spark thriftserver without DRA, I found the Executor's heap contains 1.6 million PartitionLocation objects (and StorageInfo): ![image](https://github.com/apache/incubator-celeborn/assets/948245/43658369-7763-4511-a5b0-9b3fbdf02005) After this PR, the number of PartitionLocation objects decreases to 275 thousands ![image](https://github.com/apache/incubator-celeborn/assets/948245/45f8f849-186d-4cad-83c8-64bd6d18debc) This heartbeat can be extended in the future for other purposes, i.e. reporting client's metrics. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Passes GA and manual test. Closes #1719 from waitinfuture/798. Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:ae9380a
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-791] Remove slots allocation simulation in master and use active slots sent from worker's heartbeat ### What changes were proposed in this pull request? Master won't simulate slots allocations and use active slots sent from worker. ### Why are the changes needed? I have observed that a new worker might allocate more slots than other workers when using the round-robin slot allocation algorithm. There is a logic error in processing heartbeat from worker. It will update disk info's active slots to max(current disk info active slots, disk info sent from worker active slots). If I registered a huge shuffle, master will allocate more slots than a disk's max slots and mark them as unknown disk slots but worker will count the unknown disk slots as active slots and report it to the master. Then the slots release logic can not distinguish unknown slots from a number so the release will not decrease active slots properly. Due to the gap between work and master, so I think it's OK to remove slots allocation simulation from worker and use active slots from worker. Before this patch: <img width="928" alt="截屏2023-07-12 16 51 15" src="https://github.com/apache/incubator-celeborn/assets/4150993/9c8a46d9-26a8-42f5-a956-938273277c9b"> After this patch: <img width="509" alt="截屏2023-07-12 16 25 52" src="https://github.com/apache/incubator-celeborn/assets/4150993/c49b3d91-14ea-4eb8-9b71-9aab73541faf"> ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT and cluster. Closes #1710 from FMX/CELEBORN-791. Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit a4687716d2f1fcc4bf487ced70c0fb6b5de037f0) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:a468771
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-791] Remove slots allocation simulation in master and use active slots sent from worker's heartbeat ### What changes were proposed in this pull request? Master won't simulate slots allocations and use active slots sent from worker. ### Why are the changes needed? I have observed that a new worker might allocate more slots than other workers when using the round-robin slot allocation algorithm. There is a logic error in processing heartbeat from worker. It will update disk info's active slots to max(current disk info active slots, disk info sent from worker active slots). If I registered a huge shuffle, master will allocate more slots than a disk's max slots and mark them as unknown disk slots but worker will count the unknown disk slots as active slots and report it to the master. Then the slots release logic can not distinguish unknown slots from a number so the release will not decrease active slots properly. Due to the gap between work and master, so I think it's OK to remove slots allocation simulation from worker and use active slots from worker. Before this patch: <img width="928" alt="截屏2023-07-12 16 51 15" src="https://github.com/apache/incubator-celeborn/assets/4150993/9c8a46d9-26a8-42f5-a956-938273277c9b"> After this patch: <img width="509" alt="截屏2023-07-12 16 25 52" src="https://github.com/apache/incubator-celeborn/assets/4150993/c49b3d91-14ea-4eb8-9b71-9aab73541faf"> ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT and cluster. Closes #1710 from FMX/CELEBORN-791. Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:1642090
Author:Angerszhuuuu

[CELEBORN-781] Refactor RPC message type name ### What changes were proposed in this pull request? After https://github.com/apache/incubator-celeborn/pull/1658 merged, we can format the message type now. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1696 from AngersZhuuuu/CELEBORN-731. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>

Commit:b4dfb03
Author:Angerszhuuuu

[CELEBORN-733] Clean unused GetBlacklist & GetBlacklistResponse ### What changes were proposed in this pull request? Clean unused GetBlacklist & GetBlacklistResponse ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1656 from AngersZhuuuu/CELEBORN-733. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>

Commit:b892d9d
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-764] Fix celeborn on HDFS might clean using app directories ### What changes were proposed in this pull request? Make Celeborn leader clean expired app dirs on HDFS when an application is Lost. ### Why are the changes needed? If Celeborn is working on HDFS, the storage manager starts and cleans expired app directories, and the newly created worker will want to delete any unknown app directories. This will cause using app directories to be deleted unexpectedly. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT and cluster. Closes #1678 from FMX/CELEBORN-764. Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Cheng Pan <pan3793@gmail.com> Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit d0ecf83fecd08a0c3ee1e223dd6ba3a0dcb6da16) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:d0ecf83
Author:mingji
Committer:zky.zhoukeyong

[CELEBORN-764] Fix celeborn on HDFS might clean using app directories ### What changes were proposed in this pull request? Make Celeborn leader clean expired app dirs on HDFS when an application is Lost. ### Why are the changes needed? If Celeborn is working on HDFS, the storage manager starts and cleans expired app directories, and the newly created worker will want to delete any unknown app directories. This will cause using app directories to be deleted unexpectedly. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT and cluster. Closes #1678 from FMX/CELEBORN-764. Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Cheng Pan <pan3793@gmail.com> Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:d237464
Author:Fu Chen
Committer:Cheng Pan

[CELEBORN-726][FOLLOWUP] Update data replication terminology from `master/slave` to `primary/replica` in the codebase ### What changes were proposed in this pull request? As title ### Why are the changes needed? In order to distinguish it from the existing master/worker, refactor data replication terminology to 'primary/replica' for improved clarity and inclusivity in the codebase ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #1639 from cfmcgrady/primary-replica. Lead-authored-by: Fu Chen <cfmcgrady@gmail.com> Co-authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org> (cherry picked from commit adbd38a926b2da095dfd96c558df8ec9bec05076) Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:adbd38a
Author:Fu Chen
Committer:Cheng Pan

[CELEBORN-726][FOLLOWUP] Update data replication terminology from `master/slave` to `primary/replica` in the codebase ### What changes were proposed in this pull request? As title ### Why are the changes needed? In order to distinguish it from the existing master/worker, refactor data replication terminology to 'primary/replica' for improved clarity and inclusivity in the codebase ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #1639 from cfmcgrady/primary-replica. Lead-authored-by: Fu Chen <cfmcgrady@gmail.com> Co-authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:616898b
Author:Angerszhuuuu
Committer:Cheng Pan

[CELEBORN-739] Rename HeartbeatResponse to HeartbeatFromWorkerResponse ### What changes were proposed in this pull request? Rename HeartbeatResponse to HeartbeatFromWorkerResponse ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1651 from AngersZhuuuu/CELEBORN-739. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org> (cherry picked from commit 1a53db22cec1a1b393949898857d2301a9dbadb8) Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:1a53db2
Author:Angerszhuuuu
Committer:Cheng Pan

[CELEBORN-739] Rename HeartbeatResponse to HeartbeatFromWorkerResponse ### What changes were proposed in this pull request? Rename HeartbeatResponse to HeartbeatFromWorkerResponse ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1651 from AngersZhuuuu/CELEBORN-739. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:ef093dd
Author:Angerszhuuuu
Committer:Angerszhuuuu

[CELEBORN-735] Remove unused RPC GetWorkerInfo & GetWorkerInfosResponse ### What changes were proposed in this pull request? Remove unused RPC GetWorkerInfo & GetWorkerInfosResponse ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1647 from AngersZhuuuu/CELEBORN-735. Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com> Co-authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com> (cherry picked from commit 4c4e18b0d6e40b57326cc431fd79bbaf2c43353f) Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>

Commit:4c4e18b
Author:Angerszhuuuu

[CELEBORN-735] Remove unused RPC GetWorkerInfo & GetWorkerInfosResponse ### What changes were proposed in this pull request? Remove unused RPC GetWorkerInfo & GetWorkerInfosResponse ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1647 from AngersZhuuuu/CELEBORN-735. Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com> Co-authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>

Commit:4c7bed2
Author:Angerszhuuuu
Committer:Angerszhuuuu

[CELEBORN-734] Remove unused RPC ReregisterWorkerResonse ### What changes were proposed in this pull request? Remove unused RPC ReregisterWorkerResonse ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1646 from AngersZhuuuu/CELEBORN-734. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com> (cherry picked from commit a672db719a5b2eb3be43f26b52cfc6c321fa15b0) Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>

Commit:a672db7
Author:Angerszhuuuu

[CELEBORN-734] Remove unused RPC ReregisterWorkerResonse ### What changes were proposed in this pull request? Remove unused RPC ReregisterWorkerResonse ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1646 from AngersZhuuuu/CELEBORN-734. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>

Commit:d6dd8e4
Author:Angerszhuuuu
Committer:Cheng Pan

[CELEBORN-666][FOLLOWUP] Rename all RPC blacklist fields ### What changes were proposed in this pull request? In this pr, we rename all RPC blacklist fields, it won't have have compatibility issues. For RPC `GetBlacklist` and `GetBlacklistResponse` we won't change it, since it won't be used in next release, so we can remove these two RPC in next release. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1643 from AngersZhuuuu/CELEBORN-666-RPC. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org> (cherry picked from commit 590198eceab12e7e07d2b541a8308ee17805077a) Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:590198e
Author:Angerszhuuuu
Committer:Cheng Pan

[CELEBORN-666][FOLLOWUP] Rename all RPC blacklist fields ### What changes were proposed in this pull request? In this pr, we rename all RPC blacklist fields, it won't have have compatibility issues. For RPC `GetBlacklist` and `GetBlacklistResponse` we won't change it, since it won't be used in next release, so we can remove these two RPC in next release. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1643 from AngersZhuuuu/CELEBORN-666-RPC. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:f922d99
Author:Angerszhuuuu
Committer:Cheng Pan

[CELEBORN-732] Remove unused RPC ThreadDump & ThreadDumpResponse ### What changes were proposed in this pull request? Remove unused RPC ThreadDump & ThreadDumpResponse ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1645 from AngersZhuuuu/CELEBORN-732. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org> (cherry picked from commit ad13b04f2e7efe77ef130af66058f5879ed89029) Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:ad13b04
Author:Angerszhuuuu
Committer:Cheng Pan

[CELEBORN-732] Remove unused RPC ThreadDump & ThreadDumpResponse ### What changes were proposed in this pull request? Remove unused RPC ThreadDump & ThreadDumpResponse ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1645 from AngersZhuuuu/CELEBORN-732. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:e88d5ff
Author:Angerszhuuuu
Committer:Cheng Pan

[CELEBORN-730] Remove unused SlaveLostResponse ### What changes were proposed in this pull request? Remove unused SlaveLostResponse ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1644 from AngersZhuuuu/CELEBORN-730. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org> (cherry picked from commit 63f22342e92de666dede05f08281c26a63ddb7ef) Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:63f2234
Author:Angerszhuuuu
Committer:Cheng Pan

[CELEBORN-730] Remove unused SlaveLostResponse ### What changes were proposed in this pull request? Remove unused SlaveLostResponse ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1644 from AngersZhuuuu/CELEBORN-730. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:9e3a530
Author:Cheng Pan
Committer:zky.zhoukeyong

[CELEBORN-729] Fix typo PbRegisterShuffle#numMappers ### What changes were proposed in this pull request? Fix typo `numMapppers`, should be `numMappers` ### Why are the changes needed? Fix typo ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Protobuf serde depends on message field seq no, not name. Closes #1642 from pan3793/CELEBORN-729. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit 3d7c1fa0ae1deed8abca8951bd0193486301c700) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:3d7c1fa
Author:Cheng Pan
Committer:zky.zhoukeyong

[CELEBORN-729] Fix typo PbRegisterShuffle#numMappers ### What changes were proposed in this pull request? Fix typo `numMapppers`, should be `numMappers` ### Why are the changes needed? Fix typo ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Protobuf serde depends on message field seq no, not name. Closes #1642 from pan3793/CELEBORN-729. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>

Commit:79cdc44
Author:zhongqiang.czq
Committer:Cheng Pan

[CELEBORN-724] Fix the compatibility of HeartbeatFromApplicationRespo… …nse with lower versions ### What changes were proposed in this pull request? The master side will check HeartbeatFromApplication's reply field. if reply is true then it replies HeartbeatFromApplicationResponse otherwise OneWayMessageResponse. The reply field is default false before the version 0.2.1, so master can be compatible with older client version ### Why are the changes needed? Before the version `0.2.1`, the response of HeartbeatFromApplication is` OneWayMessageResponse`, but from `0.3.0`, the response of HeartbeatFromApplication is modified to `HeartbeatFromApplicationResponse`. if the version of `client side `is `0.2.1` and the version of `server side is 0.3.0`, the `compatiblity issue `will occur. The following compatiblity error will be printted. ``` java java.io.InvalidObjectException: enum constant HEARTBEAT_FROM_APPLICATION_RESPONSE does not exist in class org.apache.celeborn.common.protocol.MessageType at java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2157) ~[?:1.8.0_362] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1662) ~[?:1.8.0_362] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) ~[?:1.8.0_362] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) ~[?:1.8.0_362] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) ~[?:1.8.0_362] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) ~[?:1.8.0_362] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) ~[?:1.8.0_362] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) ~[?:1.8.0_362] at org.apache.celeborn.common.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) ~[celeborn-client-spark-3-shaded_2.12-0.2.1-incubating.jar:?] ``` ``` java Caused by: java.lang.ClassCastException: Cannot cast org.apache.celeborn.common.protocol.message.ControlMessages$HeartbeatFromApplicationResponse to org.apache.celeborn.common.protocol.message.ControlMessages$OneWayMessageResponse$ at java.lang.Class.cast(Class.java:3369) ~[?:1.8.0_362] at scala.concurrent.Future.$anonfun$mapTo$1(Future.scala:500) ~[scala-library-2.12.15.jar:?] at scala.util.Success.$anonfun$map$1(Try.scala:255) ~[scala-library-2.12.15.jar:?] at scala.util.Success.map(Try.scala:213) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BatchingExecutor$Batch.processBatch$1(BatchingExecutor.scala:67) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BatchingExecutor$Batch.$anonfun$run$1(BatchingExecutor.scala:82) ~[scala-library-2.12.15.jar:?] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:59) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:110) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Promise.trySuccess(Promise.scala:94) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Promise.trySuccess$(Promise.scala:94) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise$DefaultPromise.trySuccess(Promise.scala:187) ~[scala-library-2.12.15.jar:?] at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.onSuccess$1(NettyRpcEnv.scala:218) ~[celeborn-client-spark-3-shaded_2.12-0.2.1-incubating.jar:?] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The pr is tested manually and the testing process is as follows: 1. server side is deploy using the code of latest branch-0.3. 2. spark client is deploy the version of 0.2.1, then run spark-sql to execute 3 tpcds queries( query1.sql/querey2/quere3.sql whose datasize is 1T), finnally verify that the queries are executed successfully and no above compatiblity error printted 3. spark client is deploy the version of 0.3.0, then run spark-sql to execute 3 tpcds queries( query1.sql/querey2/quere3.sql whose datasize is 1T), finnally verify that the queries are executed successfully and no above compatiblity error printted This patch had conflicts when merged, resolved by Committer: Cheng Pan <chengpan@apache.org> Closes #1635 from zhongqiangczq/heartbeat2. Authored-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com> Signed-off-by: Cheng Pan <chengpan@apache.org> (cherry picked from commit 374d735ae598240aa8722d09dbfc80f11c3b30e5) Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:374d735
Author:zhongqiang.czq
Committer:Cheng Pan

[CELEBORN-724] Fix the compatibility of HeartbeatFromApplicationRespo… …nse with lower versions ### What changes were proposed in this pull request? The master side will check HeartbeatFromApplication's reply field. if reply is true then it replies HeartbeatFromApplicationResponse otherwise OneWayMessageResponse. The reply field is default false before the version 0.2.1, so master can be compatible with older client version ### Why are the changes needed? Before the version `0.2.1`, the response of HeartbeatFromApplication is` OneWayMessageResponse`, but from `0.3.0`, the response of HeartbeatFromApplication is modified to `HeartbeatFromApplicationResponse`. if the version of `client side `is `0.2.1` and the version of `server side is 0.3.0`, the `compatiblity issue `will occur. The following compatiblity error will be printted. ``` java java.io.InvalidObjectException: enum constant HEARTBEAT_FROM_APPLICATION_RESPONSE does not exist in class org.apache.celeborn.common.protocol.MessageType at java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2157) ~[?:1.8.0_362] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1662) ~[?:1.8.0_362] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) ~[?:1.8.0_362] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) ~[?:1.8.0_362] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) ~[?:1.8.0_362] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) ~[?:1.8.0_362] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) ~[?:1.8.0_362] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) ~[?:1.8.0_362] at org.apache.celeborn.common.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) ~[celeborn-client-spark-3-shaded_2.12-0.2.1-incubating.jar:?] ``` ``` java Caused by: java.lang.ClassCastException: Cannot cast org.apache.celeborn.common.protocol.message.ControlMessages$HeartbeatFromApplicationResponse to org.apache.celeborn.common.protocol.message.ControlMessages$OneWayMessageResponse$ at java.lang.Class.cast(Class.java:3369) ~[?:1.8.0_362] at scala.concurrent.Future.$anonfun$mapTo$1(Future.scala:500) ~[scala-library-2.12.15.jar:?] at scala.util.Success.$anonfun$map$1(Try.scala:255) ~[scala-library-2.12.15.jar:?] at scala.util.Success.map(Try.scala:213) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BatchingExecutor$Batch.processBatch$1(BatchingExecutor.scala:67) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BatchingExecutor$Batch.$anonfun$run$1(BatchingExecutor.scala:82) ~[scala-library-2.12.15.jar:?] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:59) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:110) ~[scala-library-2.12.15.jar:?] at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Promise.trySuccess(Promise.scala:94) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Promise.trySuccess$(Promise.scala:94) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise$DefaultPromise.trySuccess(Promise.scala:187) ~[scala-library-2.12.15.jar:?] at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.onSuccess$1(NettyRpcEnv.scala:218) ~[celeborn-client-spark-3-shaded_2.12-0.2.1-incubating.jar:?] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The pr is tested manually and the testing process is as follows: 1. server side is deploy using the code of latest branch-0.3. 2. spark client is deploy the version of 0.2.1, then run spark-sql to execute 3 tpcds queries( query1.sql/querey2/quere3.sql whose datasize is 1T), finnally verify that the queries are executed successfully and no above compatiblity error printted 3. spark client is deploy the version of 0.3.0, then run spark-sql to execute 3 tpcds queries( query1.sql/querey2/quere3.sql whose datasize is 1T), finnally verify that the queries are executed successfully and no above compatiblity error printted This patch had conflicts when merged, resolved by Committer: Cheng Pan <chengpan@apache.org> Closes #1635 from zhongqiangczq/heartbeat2. Authored-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com> Signed-off-by: Cheng Pan <chengpan@apache.org>

Commit:5dd44fc
Author:zky.zhoukeyong
Committer:Shuang

[CELEBORN-656] Batch revive RPCs in client to avoid too many requests ### What changes were proposed in this pull request? This PR batches revive requests and periodically send to LifecycleManager to reduce number or RPC requests. To be more detailed. This PR changes Revive message to support multiple unique partitions, and also passes a set unique mapIds for checking MapEnd. Each time ShuffleClientImpl wants to revive, it adds a ReviveRquest to ReviveManager and wait for result. ReviveManager batches revive requests and periodically send to LifecycleManager (deduplicated by partitionId). LifecycleManager constructs ChangeLocationsCallContext and after all locations are notified, it replies to ShuffleClientImpl. ### Why are the changes needed? In my test 3T TPCDS q23a with 3 Celeborn workers, when kill a worker, the LifecycleManger will receive 4.8w Revive requests: ``` [emr-usermaster-1-1 logs]$ cat spark-emr-user-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-master-1-1.c-fa08904e94c028d1.out.1 |grep -i revive |wc -l 64364 ``` After this PR, number of ReviveBatch requests reduces to 708: ``` [emr-usermaster-1-1 logs]$ cat spark-emr-user-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-master-1-1.c-fa08904e94c028d1.out |grep -i revive |wc -l 2573 ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual test. I have tested: 1. Disable graceful shutdown, kill one worker, job succeeds 2. Disable graceful shutdown, kill two workers successively, job fails as expected 3. Enable graceful shutdown, restart two workers successively, job succeeds 4. Enable graceful shutdown, restart two workers successively, then kill the third one, job succeeds Closes #1588 from waitinfuture/656-2. Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Co-authored-by: Keyong Zhou <zhouky@apache.org> Co-authored-by: Keyong Zhou <waitinfuture@gmail.com> Signed-off-by: Shuang <lvshuang.tb@gmail.com> (cherry picked from commit 57b0e815cf861ee975a70c0d9744cb04a3db2393) Signed-off-by: Shuang <lvshuang.tb@gmail.com>