Proto commits in apache/incubator-uniffle

These 73 commits are when the Protocol Buffers files have changed:

Commit:cc440df
Author:yl09099
Committer:GitHub

[#2469] feat(spark): Introducing a dedicated shuffleId of uniffle for stage retry (#2471) ### What changes were proposed in this pull request? Use the auto-incrementing shuffleId to replace the shuffleId of Spark. When Stage retry occurs, use the auto-incrementing shuffleId of Uniffle to avoid clearing the existing Shuffle data of the Shuffle Server. ### Why are the changes needed? Design doc: https://docs.google.com/document/d/1TPxg4CUNklg26-JIcKCeIDwZb9lWKj3mIu-qjMcj0wU/edit?usp=sharing Fix: #2470 Fix: #2469 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT.

The documentation is generated from this commit.

Commit:057dcd2
Author:Junfan Zhang
Committer:GitHub

[#2460] feat(spark)(part-4): Hybrid storage reading statistics (#2468) ### What changes were proposed in this pull request? 1. Add support of hybrid storage reading statistics ![image](https://github.com/user-attachments/assets/c63c95fc-9e32-4f80-b251-ce1cdc811883) ### Why are the changes needed? followup #2460 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Internal tests.

Commit:4e3dfbc
Author:Junfan Zhang
Committer:GitHub

[#2460] feat(spark)(part-1): Introducing spark uniffle ui for better observability (#2459) ### What changes were proposed in this pull request? Introducing spark uniffle ui for better observability. ![image](https://github.com/user-attachments/assets/f2aff734-075b-4352-937f-9d0918f7193d) ### Why are the changes needed? Through the spark uniffle UI, we could find out slow shuffle-servers more easiler. ### Does this PR introduce _any_ user-facing change? Yes. `spark.plugins org.apache.spark.UnifflePlugin` ### How was this patch tested? Internal tests

Commit:76a4dcf
Author:Junfan Zhang
Committer:GitHub

feat(client): Introduce `LOAD_BALANCE` mode for partition split (#2408) ### What changes were proposed in this pull request? Introduce the load_balance mode for partition split ### Why are the changes needed? Firstly, thanks the great work to @maobaolong . This work is based on the #2093, that introduces the load_balance mode for the partition split. As we know, if the partition is big, the partition split will be activated to reassign to another server. For the default impl, the reassign logic is pipeline. it will reassign for first server -> second -> third until reaching the max reassignment server num limit. But for the huge partition with huge write throughput at the same time, I hope this can write the multi servers for load balance to speed up writing. ### Does this PR introduce _any_ user-facing change? Yes. 1. `rss.client.reassign.partitionSplitMode`. Default value PIPELINE (that is consistent with previous codebase) 2. `rss.client.reassign.partitionSplitLoadBalanceServerNumber` . Default value is 10. Only valid for load balance mode. ### How was this patch tested? 1. Internal spark jobs tests

Commit:ea3ec58
Author:Junfan Zhang
Committer:GitHub

chore: Remove the rust impl (#2373) ### What changes were proposed in this pull request? To remove the rust impl. ### Why are the changes needed? Sincerely say that rust impl lacks the necessary maintainess and block the normal github workflow. So it's time to remove it. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Needn't

Commit:0552d3b
Author:yl09099
Committer:GitHub

[#1844] fix(spark): Reassign shuffle servers when retrying stage (#1845) ### What changes were proposed in this pull request? If the Shuffle Server is not reassigned after the Retry is triggered at the Stage, data will be lost. Therefore, reassign the Shuffle Server after the Retry. question: Error: Failures: Error: RSSStageDynamicServerReWriteTest.testRSSStageResubmit:119-SparkIntegrationTestBase.run:64->SparkIntegrationTestBase.verifyTestResult:149 expected: <1000> but was: <970>. ### Why are the changes needed? Fix: #1844 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Presence test.

Commit:d653897
Author:maobaolong
Committer:Rory

[#436] feat(client,server): Introduce multi-part LocalStorageManager (#2253) ### What changes were proposed in this pull request? - Introduce a factory to create specific LocalStorageManager by config. - Introduce multiply disk LocalStorageManager. ### Why are the changes needed? Fix: #436 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Existing UTs and new added UT - Tested on our pressure test cluster. - new client -> old server ✓ - new client -> new server ✓ - old client -> new server ❌, so we have to upgraded client first, than upgrade the servers

Commit:d2f8ea0
Author:maobaolong
Committer:Rory

[MINOR] feat(server,dashboard,coordinator): Report configured metrics of server to coordinator and display to dashboard (#2239) ### What changes were proposed in this pull request? Report configured metrics of server to coordinator and display to dashboard ### Why are the changes needed? With this feature, it can be extensible for displaying server state by dashboard. In the beginning, we only need to display the app number in the dashboard, after a few days, we want to add partition with node in to dashboard, maybe no more days later, we need another value displayed in to dashboard. So it is not extensible if we add it one by one, this is why I invent this feature, it can be easy to let user config the metrics what they think it is necessary and should be displayed in the dashboard. ### Does this PR introduce _any_ user-facing change? - new config: rss.server.displayMetricsList ### How was this patch tested? Test Locally. <img width="288" alt="image" src="https://github.com/user-attachments/assets/a10fdbe4-08fa-4ccb-8614-80b2447b30f9">

Commit:b7d391c
Author:maobaolong
Committer:GitHub

[#436] feat(client,server): Introduce multi-part LocalStorageManager (#2253) ### What changes were proposed in this pull request? - Introduce a factory to create specific LocalStorageManager by config. - Introduce multiply disk LocalStorageManager. ### Why are the changes needed? Fix: #436 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Existing UTs and new added UT - Tested on our pressure test cluster. - new client -> old server ✓ - new client -> new server ✓ - old client -> new server ❌, so we have to upgraded client first, than upgrade the servers

Commit:910823d
Author:maobaolong
Committer:GitHub

[MINOR] feat(server,dashboard,coordinator): Report configured metrics of server to coordinator and display to dashboard (#2239) ### What changes were proposed in this pull request? Report configured metrics of server to coordinator and display to dashboard ### Why are the changes needed? With this feature, it can be extensible for displaying server state by dashboard. In the beginning, we only need to display the app number in the dashboard, after a few days, we want to add partition with node in to dashboard, maybe no more days later, we need another value displayed in to dashboard. So it is not extensible if we add it one by one, this is why I invent this feature, it can be easy to let user config the metrics what they think it is necessary and should be displayed in the dashboard. ### Does this PR introduce _any_ user-facing change? - new config: rss.server.displayMetricsList ### How was this patch tested? Test Locally. <img width="288" alt="image" src="https://github.com/user-attachments/assets/a10fdbe4-08fa-4ccb-8614-80b2447b30f9">

Commit:051a247
Author:maobaolong
Committer:GitHub

[#2086] feat(spark): Support cut partition to slices and served by multiply server (#2093) ### What changes were proposed in this pull request? Support sliced store partition to multiply server. Limitation: - Only finished tested the netty mode. ### Why are the changes needed? Fix: #2086 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Start multiply servers and coordinator on local - Start a spark standalone env on local - Start spark-shell and execute `test.scala` ```Console bin/spark-shell --master spark://localhost:7077 --deploy-mode client --conf spark.rss.client.reassign.blockRetryMaxTimes=3 --conf spark.rss.writer.buffer.spill.size=30 --conf spark.rss.client.reassign.enabled=true --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager --conf spark.rss.coordinator.quorum=localhost:19999 --conf spark.rss.storage.type=LOCALFILE --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.rss.test.mode.enable=true --conf spark.rss.client.type=GRPC_NETTY --conf spark.sql.shuffle.partitions=1 -i test.scala ``` - test.scala ```scala val data = sc.parallelize(Seq(("A", 1), ("B", 2), ("C", 3), ("A", 4), ("B", 5), ("A", 6), ("A", 7),("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7))); val result = data.reduceByKey(_ + _); result.collect().foreach(println); System.exit(0); ``` <img width="410" alt="image" src="https://github.com/user-attachments/assets/7c72fa3e-cfb5-4361-9875-a82b6aeeedfb">

Commit:fdcb54c
Author:maobaolong
Committer:GitHub

[#2197] Support reg app conf to server and avoid update committed/cached blockIds bitmap (#2196) ### What changes were proposed in this pull request? Support reg app conf to server. ### Why are the changes needed? Fix #2197 Avoid update committed/cached blockIds bitmap while storage type is with memory. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Locally

Commit:87f9b6f
Author:maobaolong
Committer:GitHub

[MINOR]improvement(client/server): (RemoteMerger) Refactor to use mergeContext collect the arguments related (#2195) ### What changes were proposed in this pull request? Refactor to use mergeContext collect the arguments related to remote merger ### Why are the changes needed? - Make code clean and friendly to other developer who do not attention to `Remote Merger`. - Without api change while extends the `mergeContext`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No need, just refactor.

Commit:3a35b0f
Author:leewish
Committer:GitHub

[#2207] feat(dashboard): Add the write information of appinfo in Shuflle Server heartbeat (#2208) ### What changes were proposed in this pull request? Add the write information of appinfo in Shuflle Server heartbeat ### Why are the changes needed? Fix: #2207 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Locally Co-authored-by: wenlongwlli <wenlongwlli@tencent.com>

Commit:d170004
Author:zhengchenyu
Committer:GitHub

[#1749] feat(remote merge): Introduce new reader for reading sorted data. (#2034) ### What changes were proposed in this pull request? Introduce new reader for reading sorted data. Since #1748 already provides methods for merging blocks, we need to provide a method for reading merged block. The record obtained from the getSortedShuffleData method is sorted using the comparatorClassName which is passed by registerShuffle. ### Why are the changes needed? Fix: #1749 ### Does this PR introduce _any_ user-facing change? Yes, add doc in a separated issue ### How was this patch tested? unit test, integration test, test real job in cluster.

Commit:69e4cde
Author:maobaolong
Committer:GitHub

[#1888] feat(server): Reject requireBuffer/sendShuffleData for an application if one of the partitions exceeds the limit (#1889) ### What changes were proposed in this pull request? Reject the `requireBuffer` and `sendShuffleData` requests for an application if one of the partitions exceeds the limit. Introduce a config to limit the maximum of partition size, the client will receive an exception with message to show the partition size and the configured max partition size. ### Why are the changes needed? Fix: #1888 ### Does this PR introduce _any_ user-facing change? Yes. this PR introduced a new config key `rss.server.huge-partition.size.hard.limit` with default value Long.MAX_VALUE to keep consistent with the previous code. ### How was this patch tested? Manually tested in our env: - Configure `rss.server.huge-partition.size.hard.limit` to a small size and wait for the expected exception.

Commit:6e8f68a
Author:leewish
Committer:GitHub

[#1977] improvment(dashboard): Add version and git commit id in dashboard (#1964) ### What changes were proposed in this pull request? Display version and git commit id in dashboard ### Why are the changes needed? The version and git commit id can be used to easily locate whether the component has been upgraded and troubleshoot problems. ### Does this PR introduce _any_ user-facing change? User can view version and git commit id from dashboard ### How was this patch tested? Use the latest version, open the dashboard ui, and you can see the version and git commit id on the Coordinator and Shuffle server pages. Submit a task, and you can see the version and git commit id of each application task in the Apps module on the Application page.

Commit:7a6d276
Author:maobaolong
Committer:GitHub

[MINOR] feat(server,coordinator): Heartbeat server startTimeMs to coordinator (#1975) ### What changes were proposed in this pull request? Server send start time to coordinator through heartbeat. ### Why are the changes needed? Get each server start time to know well which server ever restarted. ### Does this PR introduce _any_ user-facing change? User can the the server start time from rest api. ### How was this patch tested? curl http://<COORDINATOR_HOST>: <COORDINATOR_JETTY_PORT>/api/server/nodes

Commit:2c670d5
Author:kqhzz
Committer:GitHub

[#1924] feat(dashboard): Show Thread Dump, Conf and Metrics in DashBoard (#1927) ### What changes were proposed in this pull request? Add jetty_port in message ShuffleServerId. Modify dashboard, add some link in dashboard <img width="1086" alt="企业微信截图_ede78628-56ac-4b7e-86ab-84b224da7ce4" src="https://github.com/user-attachments/assets/a263871f-0a9a-4b1d-9ba6-0341708e0a2d"> <img width="1775" alt="企业微信截图_88b2c855-68e4-4054-94a3-d730f074a4b9" src="https://github.com/user-attachments/assets/480352d6-bf29-48bc-af73-a41fa1fd8248"> ### Why are the changes needed? Enhance dashboard capabilities Fix: #1924 ### Does this PR introduce _any_ user-facing change? No.

Commit:f984efb
Author:maobaolong
Committer:GitHub

[#1894] fix(server): Fix NPE caused by app not found issue (#1915) ### What changes were proposed in this pull request? Fix the NPE issue within shuffle server. ### Why are the changes needed? Fix: #1894 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested on our test cluster.

Commit:7794f0f
Author:Junfan Zhang
Committer:GitHub

[#1791] feat(spark)(coordinator): Take more infos on getting assignment to track app reassign/stageRetry (#1792) ### What changes were proposed in this pull request? Take more infos on getting assignment to track app reassign/stageRetry ### Why are the changes needed? For #1791 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests.

Commit:a22f95a
Author:Junfan Zhang
Committer:GitHub

[#1799] improvement(spark): Rename shuffleManager rpc to reassignOnStageResubmit (#1800)

Commit:375262a
Author:yl09099
Committer:GitHub

[#1579][part-1] fix(spark): Ensure all previous data is cleared for stage retry (#1762) ### What changes were proposed in this pull request? 1. clear out previous stage attempt data synchronously when registering the re-assignment shuffleIds. 2. rework the stage retry interface and rpc 3. introducing the stage version to avoid accepting the older data ### Why are the changes needed? Fix: https://github.com/apache/incubator-uniffle/issues/1579 If the previous stage attempt is in the purge queue in shuffle-server side, the retry stage writing will cause unknown exceptions, so we'd better to clear out all previous stage attempt data before re-registering This PR is to sync remove previous stage data when the first attempt writer is initialized. Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing tests.

Commit:1c72cb7
Author:Junfan Zhang
Committer:GitHub

[#1608] improvement(spark3): Output more task level infos in driver side when reassigning on block sent failure (#1771) ### What changes were proposed in this pull request? 1. Output more task level logs (like taskId, executorId and so on) in driver side 2. Fix typo of `ReassignServersResponse` ### Why are the changes needed? For better seeing the more contexts when reassign happens ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Needn't

Commit:93f4347
Author:Junfan Zhang
Committer:GitHub

[#1709] feat(coordinator): Introduce pluggable `ClientConfApplyStrategy` for `fetchClientConf` rpc (#1710) ### What changes were proposed in this pull request? Introduce pluggable ClientConfApplyManager for fetchClientConf rpc ### Why are the changes needed? For #1709 ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Unit tests

Commit:313d4e0
Author:Junfan Zhang
Committer:GitHub

[#1538] feat(spark): report blockIds to spark driver optionally (#1677) ### What changes were proposed in this pull request? Support report blockIds from shuffle-servers to spark driver optionally ### Why are the changes needed? Fix: #1538 ### Does this PR introduce _any_ user-facing change? Yes. `rss.client.blockId.selfManagedEnabled` is introduced, default value is false. ### How was this patch tested? Integration tests.

Commit:30bf8dc
Author:Junfan Zhang
Committer:GitHub

[#1608][part-5] feat(spark3): always use the available assignment (#1652) ### What changes were proposed in this pull request? 1. make the write client always use the latest available assignment for the following writing when the block reassign happens. 2. support multi time retry for partition reassign 3. limit the max reassign server num of one partition 4. refactor the reassign rpc 5. rename the faultyServer -> receivingFailureServer. #### Reassign whole process ![image](https://github.com/apache/incubator-uniffle/assets/8609142/8afa5386-be39-4ccb-9c10-95ffb3154939) #### Always using the latest assignment To acheive always using the latest assignment, I introduce the `TaskAttemptAssignment` to get the latest assignment for current task. The creating process of AddBlockEvent also will apply the latest assignment by `TaskAttemptAssignment` And it will be updated by the `reassignOnBlockSendFailure` rpc. That means the original reassign rpc response will be refactored and replaced by the whole latest `shuffleHandleInfo`. ### Why are the changes needed? This PR is the subtask for #1608. Leverging the #1615 / #1610 / #1609, we have implemented the reassign servers mechansim when write client encounters the server failure or unhealthy. But this is not good enough that will not share the faulty server state to the unstarted tasks and latter `AddBlockEvent` . ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Unit and integration tests. Integration tests as follows: 1. `PartitionBlockDataReassignBasicTest` to validate the reassign mechanism valid 2. `PartitionBlockDataReassignMultiTimesTest` is to test the partition reassign mechanism of multiple retries. --------- Co-authored-by: Enrico Minack <github@enrico.minack.dev>

Commit:60fce8e
Author:Junfan Zhang
Committer:GitHub

[#1608][part-3] feat(spark3): support reading data from multiple reassigned servers (#1615) ### What changes were proposed in this pull request? Support reading from partition block data reassignment servers. ### Why are the changes needed? For: #1608 Writer has been writing data into reassignment servers, so it's necessary to read from reassignment servers. And the blockId will be stored in their owned partition servers, so this PR can read blockIds from these servers and support min-replica requirements at the same time. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `PartitionBlockDataReassignTest` integration test.

Commit:1051d26
Author:Junfan Zhang
Committer:GitHub

[#1608][part-1] fix(spark): Only share the replacement servers for faulty servers in one stage (#1609) ### What changes were proposed in this pull request? 1. Lock the `shuffleHandle` to ensure the thread safe when reassigning partial server for tasks 2. Only share the replacement servers for faulty servers in one stage rather than the whole app 3. Simplify the reassignment logic, like the single one replacement server which will be supported in the future, so let's remove it currently. 4. correct the `partitionIds` type from `string` to `int` in proto ### Why are the changes needed? Fix: #1608 In current implementation of partition reassignment, it will share the same reassignment servers for the different stages, which will crash for app without registry. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UTs

Commit:dc890ff
Author:Enrico Minack
Committer:GitHub

[#731] feat(spark): Make blockid layout configurable for Spark clients (#1528) ### What changes were proposed in this pull request? Make bit-lengths in block id (block id layout) configurable through dynamic client config from coordinator or client config. Block id layout can be created from RssConf, or where that is not available, is being passed around. - Adds new options (defaults are equivalent to current values in `Constants`): - rss.client.blockId.sequenceNoBits - rss.client.blockId.partitionIdBits - rss.client.blockId.taskAttemptIdBits - Adds block id layout to two requests (default is layout with current values in `Constants`). Default values have moved from `Constants` into `BlockIdLayout`. The following replacements exist: - `PARTITION_ID_MAX_LENGTH`: `BlockIdLayout.DEFAULT.partitionIdBits` - `TASK_ATTEMPT_ID_MAX_LENGTH`: `BlockIdLayout.DEFAULT.taskAttemptIdBits` - `ATOMIC_INT_MAX_LENGTH`: `BlockIdLayout.DEFAULT.sequenceNoBits` ### Why are the changes needed? The bit-lengths of sequence number, partition id and task attempt id in block id are defined in `common/src/main/java/org/apache/uniffle/common/util/Constants.java`. Changing these requires recompiling and redeploying the project. Making this configurable in `coordinator.conf`, `server.conf` or client-side would very useful. Also see #1512, #749. Fixes #731. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tests.

Commit:59aa30d
Author:xumanbu
Committer:GitHub

[#1373][part-1] feat(spark): partition write to multi servers leveraging from reassignment mechanism (#1445) ### What changes were proposed in this pull request? partition write to multi servers leveraging from reassignment mechanism ### Why are the changes needed? For: https://github.com/apache/incubator-uniffle/issues/1373 ### Does this PR introduce _any_ user-facing change? 1、add config `rss.server.dynamic.assign.enabled` for whether to reassign a faulty shuffle server. 2、support reassign a new shuffle server for send failed blocks 3、ShuffleReader read partition in muitl server implement will in next pr ### How was this patch tested? UTs --------- Co-authored-by: jam.xu <jam.xu@vipshop.com>

Commit:f6d1e7a
Author:Junfan Zhang
Committer:GitHub

[#1476] feat(rust): Provide dedicated unregister app rpc interface (#1511) ### What changes were proposed in this pull request? Provide dedicated unregister app rpc interface for rust based server ### Why are the changes needed? Fix: #1476 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests.

Commit:9471cba
Author:Yiqi You
Committer:GitHub

[#1476] feat(spark): Provide dedicated unregister app rpc interface (#1510) ### What changes were proposed in this pull request? Introduce dedicated unregisterApp rpc interface which is only called once when unregister shuffle ### Why are the changes needed? Fix: [#1476](https://github.com/apache/incubator-uniffle/issues/1476) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT

Commit:08e8b63
Author:Qing
Committer:GitHub

[#1119] improvement(client): Explicitly throw `BUFFER_LIMIT_OF_HUGE_PARTITION` (#1425) ### What changes were proposed in this pull request? Explicitly throw `BUFFER_LIMIT_OF_HUGE_PARTITION` instead of `NO_BUFFER` ### Why are the changes needed? Fix: https://github.com/apache/incubator-uniffle/issues/1119 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? unit test

Commit:ecbf2e7
Author:yl09099
Committer:GitHub

[#960][part-4] feat(dashboard): Fix some display bugs and optimize the display format. (#1326) ### What changes were proposed in this pull request? 1.Fixed a bug that returned data from some interface calls. 2.Modify some memory to be represented in tape units. ### Why are the changes needed? The dashboard page cannot display some indicators. Fix: #960 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT.

Commit:adafac4
Author:yl09099
Committer:GitHub

[#825][part-5] feat(spark): Adds the RPC interface to reassign the ShuffleServer list. (#1146) ### What changes were proposed in this pull request? Adds the RPC interface to reassign the ShuffleServer list. ### Why are the changes needed? Fix: #825 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT.

Commit:1be07a9
Author:yl09099
Committer:GitHub

[#825][part-4] feat(spark): Report write failures to ShuffleManager. (#1258) ### What changes were proposed in this pull request? Send the write exception to the ShuffleServer to the ShuffleManager. ### Why are the changes needed? Fix: #825 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT.

Commit:e9a62ee
Author:yl09099
Committer:GitHub

[#825][part-3] feat(spark): Get the ShuffleServer corresponding to the partition from ShuffleManager. (#1141) ### What changes were proposed in this pull request? ShuffleReader and ShuffleWriter get the ShuffleServer corresponding to the partition from ShuffleManager ### Why are the changes needed? Fix: #825 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT.

Commit:fa6d162
Author:yl09099
Committer:GitHub

[#825][part-1] feat(spark): Add the RPC interface for reassigning ShuffleServer (#1137) ### What changes were proposed in this pull request? We need to provide an RPC interface for reassigning ShuffleServer, with support for excluding some ShuffleServer lists. Ⅰ. Overall objective: 1. During the shuffle write phase, the ShuffleServer reports faulty nodes and reallocates the ShuffleServer list; 2. Triggers a Stage level retry of SPARK. The shuffleServer node is excluded and reallocated before the retry. Ⅱ. Implementation logic diagram: ![image](https://github.com/apache/incubator-uniffle/assets/33595968/866c8292-e0ff-4532-b519-02f424f4c2fc) Ⅲ. As shown in the picture above: 1. During Shuffle registration, obtain the ShuffleServer list to be written through the RPC interface of a Coordinator Client by following the solid blue line step. The list is bound using ShuffleID. 2, the Task of Stage starts, solid steps, in accordance with the green by ShuffleManager Client RPC interface gets to be written for shuffleIdToShuffleHandleInfo ShuffleServer list; 3. In the Stage, if Task fails to write blocks to the ShuffleServer, press the steps in red to report ShuffleServer to FailedShuffleServerList in RSSShuffleManager through the RPC interface. 4. FailedShuffleServerList records the number of ShuffleServer failures. After the number of failures reaches the maximum number of retries of the Task level, follow the steps in dotted orange lines. Through the RPC interface of a Coordinator Client, obtain the list of ShuffleServer files to be written (the ShuffleServer files that fail to be written are excluded). After obtaining the list, go to Step 5 of the dotted orange line. Throwing a FetchFailed Exception triggers a stage-level retry for SPARK; 5. Attempt 1 is generated by the SPARK Stage level again. Pull the corresponding ShuffleServer list according to the green dotted line. ### Why are the changes needed? Such an interface is required when dynamically allocating a shuffleServer list. Fix: #825 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add UT.

Commit:a1d6d50
Author:Junfan Zhang
Committer:GitHub

[#1206][part-2] feat(rust): introduce rust based shuffle-server (#1208) ### What changes were proposed in this pull request? Rust based shuffle-server is implemented by me in [riffle](https://github.com/zuston/riffle), it's time to contribute back to uniffle community. ### Why are the changes needed? For #1206 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UTs

Commit:0c3d60f
Author:yl09099
Committer:GitHub

[#937] feat: Add rest api for servernode list of losing connection and unhealthy (#938) ### What changes were proposed in this pull request? Obtain the list of ServerNodes whose heartbeat is lost from a Coordinator Server Obtain the list of ServerNodes whose unhealthy from a Coordinator Server ### Why are the changes needed? There is no interface for viewing lost and unhealthy lists Fix: #937 ### Does this PR introduce _any_ user-facing change? 1. Change in user-facing APIs. ### How was this patch tested? Updated UT: SimpleClusterManagerTest

Commit:8edefdf
Author:Junfan Zhang
Committer:GitHub

[#414] feat(client): support specifying per-partition's max concurrency to write in client side (#815) ### What changes were proposed in this pull request? 1. Support specifying per-partition's max concurrency to write in client side ### Why are the changes needed? The PR of #396 has introduced the concurrent HDFS writing for one partition, but the concurrency is determined by the server client. In order to increase flexibility, this PR supports specifying per-partition's max concurrency to write in client side ### Does this PR introduce _any_ user-facing change? Yes. The client conf of `<client_type>.rss.client.max.concurrency.per-partition.write` and `rss.server.client.max.concurrency.limit.per-partition.write` are introduced. ### How was this patch tested? 1. UTs

Commit:ea5a3ba
Author:advancedxy
Committer:GitHub

[#477][part-0] feat: add ShuffleManagerServer impl (#777) ### What changes were proposed in this pull request? 1. add shuffle manager proto 2. the corresponding client and server impls 3. add the RssShuffleManagerInterface ### Why are the changes needed? This is the first part of #477. To support re-submit spark stage, the ShuffleManagerServer should be introduced first. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added UTs and one integration test.

Commit:e38d799
Author:roryqi
Committer:GitHub

[#711] feat(netty): Add Netty port information for Shuffle Server (#712) ### What changes were proposed in this pull request? Add Netty port information for Shuffle Server ### Why are the changes needed? Let us can use old servers with Grpc and new servers with Netty at the same time. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add new ut

Commit:1951376
Author:xianjingfeng
Committer:GitHub

[#80][Part-2] feat: Add RPC logic and heartbeat logic for decommisson (#663) ### What changes were proposed in this pull request? Add RPC logic and heartbeat logic for decommisson ### Why are the changes needed? Support shuffle server decommission. It is a part of #80 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? UT

Commit:e802b93
Author:Junfan Zhang
Committer:GitHub

[ISSUE-378][HugePartition][Part-2] Introduce memory usage limit and data flush (#471) ### What changes were proposed in this pull request? 1. Introduce memory usage limit for huge partition to keep the regular partition writing stable 2. Once partition is marked as huge-partition, when its buffer size is greater than `rss.server.single.buffer.flush.threshold` value, single-buffer flush will be triggered whatever the single buffer flush is enabled or not ### Why are the changes needed? 1. To solve the problems mentioned by #378 ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? 1. UTs

Commit:19a8bac
Author:advancedxy
Committer:GitHub

[ISSUE-448][Feature] shuffle server report storage info (#449) ### What changes were proposed in this pull request? ShuffleServer reports local storage info about itself. This PR also defines a general message definition to extend remote distributed info. ### Why are the changes needed? To do better shuffle assignments and get more insight of shuffle server This addresses #448 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UTs.

Commit:45e600c
Author:xianjingfeng
Committer:GitHub

Revert "[Improvement] Skip blocks when read from memory (#294)" (#403) This reverts commit 55191c43cc94dc0e72ca378e3c65b743bf66bbb0. ### What changes were proposed in this pull request? Revert #294 ### Why are the changes needed? BlockId is discontinuous, so BLOCKID_RANGE is not a good choice to filter memory data ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No need

Commit:55191c4
Author:xianjingfeng
Committer:GitHub

[Improvement] Skip blocks when read from memory (#294) ### What changes were proposed in this pull request? Skip blocks which not in expected blockId range when read from memory. ### Why are the changes needed? 1.If we use AQE, every task will read data from all partitions. 2.If the data of the first shuffle server is incomplete, we need to read from another server if #276 is merged. Both of the above situations will lead to read redundant data from shuffle server. ### Does this PR introduce _any_ user-facing change? Set `rss.client.read.block.skip.strategy` to `BLOCKID_RANGE`. ### How was this patch tested? Already added

Commit:0e45f2d
Author:Junfan Zhang
Committer:GitHub

[Improvement][AQE] Support getting memory data skip by upstream task ids (#358) ### What changes were proposed in this pull request? Support getting memory data skip by upstream task ids ### Why are the changes needed? In current codebase, when the shuffle-server memory is large and job is optimized by AQE skew rule, the multiple readers of the same partition will get the shuffle data from the same shuffle-server. To avoid reading unused localfile/HDFS data, the PR of #137 has introduce the LOCAL_ORDER mechanism to filter the most of data. But for the storage of MEMORY, it still suffer from this. So this PR is to avoid reading unused data for one reader, by expectedTaskIds bitmap to filter. And this optimization is only enabled when AQE skew is applied. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? 1. UTs ### Benchmark #### Table Table1: 100g, dtypes: Array[(String, String)] = Array((v1,StringType), (k1,IntegerType)). And all columns of k1 have the same value (value = 10) Table2: 10 records, dtypes: Array[(String, String)] = Array((k2,IntegerType), (v2,StringType)). And it has the only one record of k2=10 #### Env Spark Resource Profile: 10 executors(1core2g) Shuffle-server Environment: 10 shuffle servers, 10g for buffer read and write. Spark Shuffle Client Config: storage type: MEMORY_LOCALFILE with LOCAL_ORDER SQL: spark.sql("select * from Table1,Table2 where k1 = k2").write.mode("overwrite").parquet("xxxxxx") #### Result __ESS__: cost `3min` __Uniffle without patch__: cost `11.6min` (2.1 + 9.5) __Uniffle with patch__: cost `3.5min` (2.1 + 1.4) Co-authored-by: xianjingfeng <583872483@qq.com>

Commit:8ff41a5
Author:jokercurry
Committer:GitHub

[Feature] Support user's app quota level limit (#311) ### What changes were proposed in this pull request? For issue #211 and the design document [https://docs.google.com/document/d/1MApSMFQgoS1VAoKbZjomqSRm0iTbSuKG1yvKNlWW65c/edit?usp=sharing](https://docs.google.com/document/d/1MApSMFQgoS1VAoKbZjomqSRm0iTbSuKG1yvKNlWW65c/edit?usp=sharing) ### Why are the changes needed? Better isolation of resources between different users. ### Does this PR introduce _any_ user-facing change? Add config `rss.coordinator.quota.default.app.num` to set default app number each user and `rss.coordinator.quota.default.path` to set a path to record the number of apps that each user can run. ### How was this patch tested? Add uts.

Commit:4004f44
Author:Xianming Lei
Committer:GitHub

[ISSUE-309][FEATURE] Support ShuffleServer latency metrics. (#327) ### What changes were proposed in this pull request? For https://github.com/apache/incubator-uniffle/issues/309, support ShuffleServer latency metrics. ### Why are the changes needed? Accurately determine whether the current service load has caused a large delay to the client's read and write. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Co-authored-by: leixianming <leixianming@didiglobal.com>

Commit:c5975a5
Author:Xianming Lei
Committer:Kaijie Chen

[ISSUE-239][BUG] RssUtils#transIndexDataToSegments should consider the length of the data file (#275) ### What changes were proposed in this pull request? For issue#239, Fix inconsistent blocks when reading shuffle data. ### Why are the changes needed? This problem will cause reading shuffle data failed. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Already added UT Co-authored-by: leixianming <leixianming@didiglobal.com>

Commit:84f781f
Author:Xianming Lei
Committer:GitHub

[ISSUE-135][FOLLOWUP][Improvement][AQE] Assign adjacent partitions to the same ShuffleServer (#307) ### What changes were proposed in this pull request? Follow issue#136, allocate adjacent partitions to the same ShuffleServer. When the client calls getShuffleResultForMultiPart, the number of ShuffleServer requests is minimized ### Why are the changes needed? Bring some performance improvement ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Co-authored-by: leixianming <leixianming@didiglobal.com>

Commit:74949f5
Author:Junfan Zhang
Committer:GitHub

[ISSUE-137][Improvement][AQE] Sort MapId before the data are flushed (#293) ### What changes were proposed in this pull request? Introduce a new mechanism to determine how to write data to file, including directly append or append after sort. ### Why are the changes needed? In our internal uniffle deployment, 200+ shuffle-servers are in service. A single shuffle-server uses 4 SATA SSDs to be used as the localfile storage, the max network bandwidth is limited to 5G/s. The storageType of the shuffle-server is MEMORY_LOCALFILE. After monitoring the read_data_rate metric, I found it always will reach the max network bandwidth. However, at that time, the number of apps running was low. And only single disk usage is 100%. After digging into the shuffle-server’s log, I found almost all requests with the same AppId and the same Partition to get the shuffle data from the same partition data file. This indicates the reason for high disk utilization due to the hotspots of reading. It was found that this App’s shuffle-read was optimized by AQE skew data split, which causes the Uniffle shuffle-server high-pressure of network and diskIO. After catching this point, I analyzed the performance of historical tasks using different shuffle-services briefly. And in current implementation, one partition’s buffer will be flushed to disk once the size reaches the threshold of 64M. And the spark/mr uniffle client will fetch one batch data of 14M size(default value). That means for one buffer of one partition, the client needs to have 5 network interactions with the shuffle-server if the data with MapId is relatively discrete. To solve this problem, we could make the 64M buffer’s data sorted by MapId. That means for the uniffle client, ideally it will read one time in a single buffer. ### Does this PR introduce _any_ user-facing change? 1. Introduce the shuffle-data distribution type, NORMAL or LOCAL_ORDER, which can have other implementations, like GLOBAL_ORDER. 2. Make the segment split strategy as a general interface for above different data distribution type. ### How was this patch tested? 1. UTs 2. Spark Tests on offline hadoop cluster ### Benchmark Table1: 100g, dtypes: Array[(String, String)] = Array((v1,StringType), (k1,IntegerType)). And all columns of k1 have the same value (value = 10) Table2: 10 records, dtypes: Array[(String, String)] = Array((k2,IntegerType), (v2,StringType)). And it has the only one record of k2=10 Environment: 100 executors(1core2g) SQL: spark.sql("select * from Table1,Table2 where k1 = k2").write.mode("overwrite").parquet("xxxxxx") - Uniffle without patch: cost 12min - Uniffle with patch: cost 4min ### Reference 1. Design doc: https://docs.google.com/document/d/1G0cOFVJbYLf2oX1fiadh7zi2M6DlEcjTQTh4kSkb0LA/edit?usp=sharing

Commit:2429c67
Author:Xianming Lei
Committer:GitHub

[ISSUE-239][BUG] RssUtils#transIndexDataToSegments should consider the length of the data file (#275) ### What changes were proposed in this pull request? For issue#239, Fix inconsistent blocks when reading shuffle data. ### Why are the changes needed? This problem will cause reading shuffle data failed. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Already added UT Co-authored-by: leixianming <leixianming@didiglobal.com>

Commit:1ee6820
Author:Junfan Zhang
Committer:GitHub

Introduce data cleanup mechanism on stage level (#249) ### What changes were proposed in this pull request? Introduce data cleanup mechanism on stage level ### Why are the changes needed? This PR is to optimize the disk capacity. For example 1. For some spark ML jobs, it will run multiple stages and reserve large unused shuffle data in shuffle-servers. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UTs

Commit:6aa4379
Author:Xianming Lei
Committer:GitHub

[Improvement][AQE] Avoid calling getShuffleResult multiple times (#190) ###What changes were proposed in this pull request? For issue #136 , When we use AQE, we may call shuffleWriteClient.getShuffleResult multiple times. But if both partition 1 and partition 2 are on the server A, we call getShuffleResult(partition 1) to get data form server A, and then we call getShuffleResult(partition 2) to get data form server A, it's not necassray. We can get getShuffleResult(partition 1, partition 2) instead. ###Why are the changes needed? Improve getShuffleResult ###Does this PR introduce any user-facing change? No ###How was this patch tested? Added UT Co-authored-by: leixianming <leixianming@didiglobal.com>

Commit:abb9215
Author:Junfan Zhang
Committer:GitHub

Support storing shuffle data to secured dfs cluster (#53) ### What changes were proposed in this pull request? Support storing shuffle data to secured HDFS cluster by spark job user's own permission in shuffle server side. ### Why are the changes needed? When using the storage type of MEMEORY_LOCALFILE_HDFS, we meet some problems on flushing shuffle data to secured HDFS clusters due to lacking credentials. To solve this, keytabs need to be distributed to all shuffle servers and login/refresh by crontab or other ways. But this way is not secured, it’s better to flush data with corresponding HDFS users for data isolation. We hope that: 1. user A launched a Spark application, and send shuffle data to shuffle server. 2. Shuffle server should write shuffle data with "user A" into HDFS. 3. Reduce tasks (launched by user A) could read shuffle data in HDFS with user 4. Otherwise, user A may not have the permission to read shuffle data written by shuffle server (another user) if it it is not owned by A. More detail and motivation can be found in design doc: https://docs.google.com/document/d/1pIDwCwv8iwnXmFQeTZKA5tK55SRc_f0SPKYFONA5K70 ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Manual tests and unit tests.

Commit:9a227da
Author:Junfan Zhang
Committer:GitHub

[Improvement] Introduce config to customize assignment server numbers in client side (#100) ### What changes were proposed in this pull request? [Improvement] Introduce config to customize assignment server numbers in client side. **Changelog** 1. Introduce the config of `<client_type>.rss.client.assignment.shuffle.nodes.max` ### Why are the changes needed? Now the assignment number specified by coordinator's conf of rss.coordinator.shuffle.nodes.max. But i think it's not suitable for all spark jobs. We should introduce new config to let client specify the assignment server number. rss.coordinator.shuffle.nodes.max should be as a max limitation of clients' number. ### Does this PR introduce _any_ user-facing change? YES. ### How was this patch tested? UT.

Commit:20d39e8
Author:Junfan Zhang
Committer:GitHub

[Log enhancement] return error message when getting assignment servers and log exception when initializing (#64) ### What changes were proposed in this pull request? Log enhancement: return error message when getting assignment servers and log exception when initializing ### Why are the changes needed? To be easy to find the cause of error. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No need.

Commit:bdffcaa
Author:Junfan Zhang
Committer:GitHub

Introduce the extraProperties to support user-defined pluggable accessCheckers (#42) ### What changes were proposed in this pull request? Introduce the reservedData to extend more pluggable accessCheckers ### Why are the changes needed? In current codebase, the accessinfo only have acessid and tags. If we want to extend more AccessChecker in coordinator, the info is not enough. To solve this, i think introducing the reservedData is necessary. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? UTs

Commit:ec00ad9
Author:roryqi
Committer:GitHub

Rename package to o.a.uniffle (#6) Co-authored-by: roryqi <roryqi@tencent.com>

Commit:cdc8cb6
Author:Kaijie Chen
Committer:GitHub

Change license owner to ASF (#5) ### What changes were proposed in this pull request? Change license owner to ASF. Update LICENSE file, and all license headers. ### Why are the changes needed? It's a Apache Incubator project now. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? 1. `mvn apache-rat:check` 2. The text is replaced block by block in VSCode.

Commit:ac18880
Author:macduan
Committer:GitHub

[Feature] Remote storage can apply its own extra config (#143) ## What changes were proposed in this pull request? Add remote storage conf Server can apply the specific remote storage conf to its storage manager (HdfsStorageManager for now) client can apply specific conf of the remote storage it fetched The storage related hadoop conf and the hadoop conf in the spark context / mr jobConf should be isolated ## Why are the changes needed? Firestorm supports multiple remote storage and we may want to apply some extra/sepecific conf to different remote storage. ## Does this PR introduce any user-facing change? No ## How was this patch tested? UT and system test

Commit:dc56f6d
Author:Colin
Committer:GitHub

[Feature] Support multiple HDFS in Firestorm (Part2 for Shuffle Server & Driver) (#121) What changes were proposed in this pull request? The PR is target to support multiple HDFS with RSS cluster, and it's the 2nd part for Shuffle Server & Driver Why are the changes needed? To release I/O pressure when with multiple HDFS cluster Does this PR introduce any user-facing change? No How was this patch tested? new UT added

Commit:1a89399
Author:Colin
Committer:GitHub

[Feature] Support multiple HDFS in Firestorm (Part1 for Coordinator &… (#110) * [Feature] Support multiple HDFS in Firestorm (Part1 for Coordinator & Driver)

Commit:459acff
Author:macduan
Committer:GitHub

[Feature] add the dynamic client conf fetching and updating (#81) ### What changes were proposed in this pull request? Add api to fetch client conf from coordinator Dynamic apply the client conf ### Why are the changes needed? Support dynamic update client conf and avoid the requirement to add some client conf other than shuffle manager, client jar. For now, we only read the conf from local file or hdfs, and we could support db and kv store or leverage a config center in the future. ### Does this PR introduce any user-facing change? No ### How was this patch tested? UT & System test

Commit:2cd2175
Author:macduan
Committer:GitHub

[Feature] Add AccessManager (#61) # What changes were proposed in this pull request? Add access manager to handle cron task access requests and add checkers to check the candidate list and cluster load respectively. # Why are the changes needed? We need to add candidates checking and control of the access when the cluster load is high. # Does this PR introduce any user-facing change? Yes, need to update the client to enable this feature. # How was this patch tested? UT and system test.

Commit:c838158
Author:Colin
Committer:GitHub

Support store shuffle data in memory (#36) What changes were proposed in this pull request? This pr is target to store shuffle data in memory. New flush strategy and read strategy are involved. The benefits with this change are: Huge memory is useful now There has no wait for data flush after stage finish Reduce flush data to storage Why are the changes needed? new RPC interface is involved Does this PR introduce any user-facing change? Yes, user need to update configuration for the new feature How was this patch tested? UT add

Commit:cce8824
Author:roryqi
Committer:GitHub

[Feature] Firestorm supports node's health check (#14) ### What changes were proposed in this pull request? Firestorm supports node's health check. There are some full disk nodes in our cluster, they influence the shuffle service. We should exclude them.Currently we only check disk usage, and exclude the unhealthy nodes. ### Why are the changes needed? Because we should have the mechanism to find the servers that can't serve, and screen them ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? newly added UTs Co-authored-by: roryqi <roryqi@tencent.com>

Commit:cfc6c31
Author:macduan
Committer:GitHub

[Feature] Read index file first in local mode (#6) * add local part * add index read length * resolve comments * Adjust some error handling * Use BytesValue unsafe wrap instead of ByteString copyFrom * Remove some unused imports Co-authored-by: macduan <macduan@tencent.com>

Commit:44ff317
Author:colinma

Initial version