These 75 commits are when the Protocol Buffers files have changed:
Commit: | 80a4501 | |
---|---|---|
Author: | Yida Wu | |
Committer: | Yida Wu |
IMPALA-13703: Cancel running queries before shutdown deadline Currently, when the graceful shutdown deadline is reached, Impala daemon exits immediately, leaving any running queries unfinished. This approach is not quite graceful, as it may result in unreleased resources, such as scratch files in remote storage. This patch adds a new state in the graceful shutdown process. Before reaching the shutdown deadline, Impala daemon will try to cancel any remaining running queries within a configurable timelimit flag, shutdown_query_cancel_period_s. If this time limit exceeds 20% of the total shutdown deadline, it will be automatically capped at that value. The idea is to cancel queries only near the end of the graceful shutdown deadline. The 20% is the threshold to allow us to take a more aggressive way to ensure a graceful shutdown. If all queries are successfully canceled within this period, the server shuts down immediately. Otherwise, it shuts down once the deadline is reached, with queries still running. Tests: Passed core tests. Added testcases test_shutdown_coordinator_cancel_query and test_shutdown_executor_with_query_cancel_period and test_shutdown_coordinator_and_executor_cancel_query. Manually tested shutdown a coord or an executor with long running queries and they were canceled. Change-Id: I1cac2e100d329644e21fdceb0b23901b08079130 Reviewed-on: http://gerrit.cloudera.org:8080/22422 Reviewed-by: Michael Smith <michael.smith@cloudera.com> Reviewed-by: Abhishek Rawat <arawat@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The documentation is generated from this commit.
Commit: | d7ee509 | |
---|---|---|
Author: | Xuebin Su | |
Committer: | Michael Smith |
IMPALA-12648: Add KILL QUERY statement To support killing queries programatically, this patch adds a new type of SQL statements, called the KILL QUERY statement, to cancel and unregister a query on any coordinator in the cluster. A KILL QUERY statement looks like ``` KILL QUERY '123:456'; ``` where `123:456` is the query id of the query we want to kill. We follow syntax from HIVE-17483. For backward compatibility, 'KILL' and 'QUERY' are added as "unreserved keywords", like 'DEFAULT'. This allows the three keywords to be used as identifiers. A user is authorized to kill a query only if the user is an admin or is the owner of the query. KILL QUERY statements are not affected by admission control. Implementation: Since we don't know in advance which impalad is the coordinator of the query we want to kill, we need to broadcast the kill request to all the coordinators in the cluster. Upon receiving a kill request, each coordinator checks whether it is the coordinator of the query: - If yes, it cancels and unregisters the query, - If no, it reports "Invalid or unknown query handle". Currently, a KILL QUERY statement is not interruptible. IMPALA-13663 is created for this. For authorization, this patch adds a custom handler of AuthorizationException for each statement to allow the exception to be handled by the backend. This is because we don't know whether the user is the owner of the query until we reach its coordinator. To support cancelling child queries, this patch changes ChildQuery::Cancel() to bypass the HS2 layer so that the session of the child query will not be added to the connection used to execute the KILL QUERY statement. Testing: - A new ParserTest case is added to test using "unreserved keywords" as identifiers. - New E2E test cases are added for the KILL QUERY statement. - Added a new dimension in TestCancellation to use the KILL QUERY statement. - Added file tests/common/cluster_config.py and made CustomClusterTestSuite.with_args() composable so that common cluster configs can be reused in custom cluster tests. Change-Id: If12d6e47b256b034ec444f17c7890aa3b40481c0 Reviewed-on: http://gerrit.cloudera.org:8080/21930 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Commit: | 3c939f0 | |
---|---|---|
Author: | Riza Suminto | |
Committer: | stiga-huang |
IMPALA-13040: Add waiting mechanism in UpdateFilterFromRemote It is possible to have UpdateFilterFromRemote RPC arrive to an impalad executor before QueryState of the destination query is created or complete initialization. This patch add wait mechanism in UpdateFilterFromRemote RPC endpoint to wait for few miliseconds until QueryState exist and complete initialization. The wait time is fixed at 500ms, with exponential sleep period in between. If wait time passed and QueryState still not found or initialized, UpdateFilterFromRemote RPC is deemed fail and query execution move on without complete filter. Testing: - Add BE tests in network-util-test.cc - Add test_runtime_filter_aggregation.py::TestLateQueryStateInit - Pass exhastive runs of test_runtime_filter_aggregation.py, test_query_live.py, and test_query_log.py Change-Id: I156d1f0c694b91ba34be70bc53ae9bacf924b3b9 Reviewed-on: http://gerrit.cloudera.org:8080/21383 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 98739a8 | |
---|---|---|
Author: | Riza Suminto | |
Committer: | Impala Public Jenkins |
IMPALA-13083: Clarify REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION This patch improves REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION error message by saying the specific configuration that must be adjusted such that the query can pass the Admission Control. New fields 'per_backend_mem_to_admit_source' and 'coord_backend_mem_to_admit_source' of type MemLimitSourcePB are added into QuerySchedulePB. These fields explain what limiting factor drives final numbers at 'per_backend_mem_to_admit' and 'coord_backend_mem_to_admit' respectively. In turn, Admission Control will use this information to compose a more informative error message that the user can act upon. The new error message pattern also explicitly mentions "Per Host Min Memory Reservation" as a place to look at to investigate memory reservations scheduled for each backend node. Updated documentation with examples of query rejection by Admission Control and how to read the error message. Testing: - Add BE tests at admission-controller-test.cc - Adjust and pass affected EE tests Change-Id: I1ef7fb7e7a194b2036c2948639a06c392590bf66 Reviewed-on: http://gerrit.cloudera.org:8080/21436 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 09d2f10 | |
---|---|---|
Author: | Riza Suminto | |
Committer: | Impala Public Jenkins |
IMPALA-13040: Add waiting mechanism in UpdateFilterFromRemote It is possible to have UpdateFilterFromRemote RPC arrive to an impalad executor before QueryState of the destination query is created or complete initialization. This patch add wait mechanism in UpdateFilterFromRemote RPC endpoint to wait for few miliseconds until QueryState exist and complete initialization. The wait time is fixed at 500ms, with exponential sleep period in between. If wait time passed and QueryState still not found or initialized, UpdateFilterFromRemote RPC is deemed fail and query execution move on without complete filter. Testing: - Add BE tests in network-util-test.cc - Add test_runtime_filter_aggregation.py::TestLateQueryStateInit - Pass exhastive runs of test_runtime_filter_aggregation.py, test_query_live.py, and test_query_log.py Change-Id: I156d1f0c694b91ba34be70bc53ae9bacf924b3b9 Reviewed-on: http://gerrit.cloudera.org:8080/21383 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 45995e6 | |
---|---|---|
Author: | Michael Smith | |
Committer: | Michael Smith |
IMPALA-12540: Query Live Table Defines SystemTable which are in-memory tables that can provide access to Impala state. Adds the 'impala_query_live' to the database 'sys', which already exists for 'sys.impala_query_log'. Implements the 'impala_query_live' table to view active queries across all coordinators sharing the same statestore. SystemTables create new SystemTableScanNodes for their scan node implementation. When computing scan range locations, SystemTableScanNodes creates a scan range for each in the cluster (identified via ClusterMembershipMgr). This produces a plan that looks like: Query: explain select * from sys.impala_query_live +------------------------------------------------------------+ | Explain String | +------------------------------------------------------------+ | Max Per-Host Resource Reservation: Memory=4.00MB Threads=2 | | Per-Host Resource Estimates: Memory=11MB | | WARNING: The following tables are missing relevant table | | and/or column statistics. | | sys.impala_query_live | | | | PLAN-ROOT SINK | | | | | 01:EXCHANGE [UNPARTITIONED] | | | | | 00:SCAN SYSTEM_TABLE [sys.impala_query_live] | | row-size=72B cardinality=20 | +------------------------------------------------------------+ Impala's scheduler checks for whether the query contains fragments that can be scheduled on coordinators, and if present includes an ExecutorGroup containing all coordinators. These are used to schedule scan ranges that are flagged as 'use_coordinator', allowing SystemTableScanNodes to be scheduled on dedicated coordinators and outside the selected executor group. Execution will pull data from ImpalaServer on the backend via a SystemTableScanner implementation based on table name. In the query profile, SYSTEM_TABLE_SCAN_NODE includes ActiveQueryCollectionTime and PendingQueryCollectionTime to track time spent collecting QueryState from ImpalaServer. Grants QueryScanner private access to ImpalaServer, identical to how ImpalaHttpHandler access internal server state. Adds custom cluster tests for impala_query_live, and unit tests for changes to planner and scheduler. Change-Id: Ie2f9a449f0e5502078931e7f1c5df6e0b762c743 Reviewed-on: http://gerrit.cloudera.org:8080/20762 Reviewed-by: Jason Fehr <jfehr@cloudera.com> Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 4428db3 | |
---|---|---|
Author: | Zoltan Borok-Nagy | |
Committer: | Zoltan Borok-Nagy |
IMPALA-12860: Invoke validateDataFilesExist for RowDelta operations We must invoke validateDataFilesExist for RowDelta operations (DELETE/ UPDATE/MERGE). Without this a concurrent RewriteFiles (compaction) and RowDelta can corrupt a table. IcebergBufferedDeleteSink now also collects the filenames of the data files that are referenced in the position delete files. It adds them to the DML exec state which is then collected by the Coordinator. The Coordinator passes the file paths to CatalogD which executes Iceberg's RowDelta operation and now invokes validateDataFilesExist() with the file paths. Additionally it also invokes validateDeletedFiles(). This patch set also resolves IMPALA-12640 which is about replacing IcebergDeleteSink with IcebergBufferedDeleteSink, as from now on we use the buffered version for all DML operations that write position delete files. Testing: * adds new stress test with DELETE + UPDATE + OPTIMIZE Change-Id: I4869eb863ff0afe8f691ccf2fd681a92d36b405c Reviewed-on: http://gerrit.cloudera.org:8080/21099 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Gabor Kaszab <gaborkaszab@cloudera.com>
Commit: | 172925b | |
---|---|---|
Author: | Riza Suminto | |
Committer: | Impala Public Jenkins |
IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from the scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. RuntimeFilterBank of the intermediate aggregator will wait for all remote filter updates for at least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be marked as ALWAYS_TRUE and sent to the coordinator. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +---------------------+------------------------+ | num aggregator node | Aggregation speed (ms) | +---------------------+------------------------+ | 0 | 1296 | | 1 | 1229 | | 2 | 608 | | 4 | 329 | | 8 | 205 | +---------------------+------------------------+ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add TestRuntimeFiltersLateRemoteUpdate. - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Reviewed-on: http://gerrit.cloudera.org:8080/20612 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | e326b3c | |
---|---|---|
Author: | Zoltan Borok-Nagy | |
Committer: | Impala Public Jenkins |
IMPALA-12313: (part 2) Limited UPDATE support for Iceberg tables This patch adds limited UPDATE support for Iceberg tables. The limitations mean users cannot update Iceberg tables if any of the following is true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The above limitations will be resolved by part 3. The usual limitations like writing non-Parquet files, using copy-on-write, modifying V1 tables are out of scope of IMPALA-12313. This patch implements UPDATEs with the merge-on-read technique. This means the UPDATE statement writes both data files and delete files. Data files contain the updated records, delete files contain the position delete records of the old data records that have been touched. To achieve the above this patch introduces a new sink: MultiDataSink. We can configure multiple TableSinks for a single MultiDataSink object. During execution, the row batches sent to the MultiDataSink will be forwarded to all the TableSinks that have been registered. The UPDATE statement for an Iceberg table creates a source select statement with all table columns and virtual columns INPUT__FILE__NAME and FILE__POSITION. E.g. imagine we have a table 'tbl' with schema (i int, s string, k int), and we update the table with: UPDATE tbl SET k = 5 WHERE i % 100 = 11; The generated source statement will be ==> SELECT i, s, 5, INPUT__FILE__NAME, FILE__POSITION FROM tbl WHERE i % 100 = 11; Then we create two table sinks that refer to expressions from the above source statement: Insert sink (i, s, 5) Delete sink (INPUT__FILE__NAME, FILE__POSITION) The tuples in the rowbatch of MultiDataSink contain slots for all the above expressions (i, s, 5, INPUT__FILE__NAME, FILE__POSITION). MultiDataSink forwards each row batch to each registered TableSink. They will pick their relevant expressions from the tuple and write data/delete files. The tuples are sorted by INPUTE__FILE__NAME and FILE__POSITION because we need to write the delete records in this order. For partitioned tables we need to shuffle and sort the input tuples. In this case we also add virtual columns "PARTITION__SPEC__ID" and "ICEBERG__PARTITION__SERIALIZED" to the source statement and shuffle and sort the rows based on them. Data files and delete files are now separated in the DmlExecState, so at the end of the operation we'll have two sets of files. We use these two sets to create a new Iceberg snapshot. Why does this patch have the limitations? - Because we are shuffling and sorting rows based on the delete records and their partitions. This means that the new data files might not get written in an efficient way, e.g. there will be too many of them, or we will need to keep too many open file handles during writing. Also, if the table has SORT BY properties, we cannot respect it as the input rows are ordered in a way to favor the position deletes. Part 3 will introduce a buffering writer for position delete files. This means we will shuffle and sort records based on the data records' partitions and SORT BY properties while delete records get buffered and written out at the end (sorted by file_path and position). In some edge cases the delete records might not get written efficiently, but it is a smaller problem then inefficient data files. Testing: * negative tests * planner tests * update all supported data types * partitioned tables * Impala/Hive interop tests * authz tests * concurrent tests Change-Id: Iff0ef6075a2b6ebe130d15daa389ac1a505a7a08 Reviewed-on: http://gerrit.cloudera.org:8080/20677 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 18c31fd | |
---|---|---|
Author: | Gabor Kaszab | |
Committer: | Impala Public Jenkins |
IMPALA-12308: DIRECTED distribution mode for V2 Iceberg tables For Iceberg tables, when joining the data files with the delete files, both of the current distribution modes (broadcast, partitioned) are wasteful. The idea is that when we read a row from a delete file it contains the name of the data file that this particular delete row is referring to so if we knew where that data file is scheduled we could directly send that delete file row there. This patch enhances the scheduler to collect the information about which data file is scheduled on which host. Since, the scan node for the data files are on the same host as the Iceberg join node, we can send the delete files directly to that specific host. Functional testing: - Re-run full test suite to check for regressions. Performance testing: 1) Local machine: SELECT COUNT(1) FROM TPCH10_parquet.lineitem Around 15% of the rows are deleted. As the table is unpartitioned I got a small number of delete files with relatively large size. Query runtime decreased by ~80% 2) Local machine: SELECT COUNT(1) FROM TPCDS10_store_sales Around 15% of the rows are deleted. Table is partitioned that results more delete files but smaller in size. Query runtime decreased by ~50% 3) Performance testing in a multi-node with data stored on S3. SELECT COUNT(1) FROM a scaled store_sales table having ~8.6B rows and ~15% are deleted. Here we had 2 scenarios: a) Table is written by Impala: One delete file row is sent exactly to one host. b) Table is written by Hive: Here apparently the data files are bigger and one data file might be spread to multiple scan ranges. As a result one delete file row might be sent to multiple hosts. The time difference between the a) run is the time spent on sending out more delete file rows. - Results with 10-node a) Runtime decreased by ~80%. b) Runtime decreased by ~60%. - Results with 20-node a) Runtime decreased by ~65%. b) Runtime decreased by ~42%. - Results with 40-node a) Runtime decreased by ~55%. b) Runtime decreased by ~42%. Change-Id: I212afd7c9e94551a1c50a40ccb0e3c1f7ecdf3d2 Reviewed-on: http://gerrit.cloudera.org:8080/20548 Reviewed-by: Tamas Mate <tmater@apache.org> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 8a2340e | |
---|---|---|
Author: | zhangyifan27 | |
Committer: | Impala Public Jenkins |
IMPALA-12096: Add process start time and version to backends webui This patch adds process start time and version to the /backends page. Two more optional elements are added in BackendDescriptorPB and can be broadcast through statestore topic. This information should be helpful for users checking all backends in a large cluster. For display, as two more columns are added to the table of backend information, the table is changed to 'table-responsive' to be scrolled horizontally with ease. A sample screenshot is attached to the IMPALA-12096 ticket. Testing: - Added cases to test_web_pages.py Change-Id: I5f1f0ba0081986f428840442c247d7dde9e1ba05 Reviewed-on: http://gerrit.cloudera.org:8080/19800 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 57964c8 | |
---|---|---|
Author: | Eyizoha | |
Committer: | Impala Public Jenkins |
IMPALA-11823: Add more items to Impala web UI queries page When operating and maintaining an Impala cluster or analyzing historical query performance, it will be helpful if we show the memory consumed, the amount of data read, and other information of the query from the historical query page of the web UI. The current historical query page does not display these information, so we should count this information when the query is executed and display it on the web page. This patch modifies the query list page (/queries) and query detail pages (/query_plan, etc.). On the list page, some metrics are added for each query record, including queuing time, memory usage, memory estimation, bytes read, and bytes sent. In addition, the Details column now shows the query ID and the position is adjusted to make them at the top of the record for easy clicking. On the query detail page, a similar record table is added to display the key information of the current query. In addition, a timeline display is added to the summary page (which is exactly the same as the timeline in the profile, just for quick viewing). For queries that are running, the above information will be automatically refreshed (only for the plan and summary tabs). To make it clear what each metric means, tooltips are added to all list headers. Change-Id: I19c75461a6405025fa433ae84d2c94d013fcaacb Reviewed-on: http://gerrit.cloudera.org:8080/19417 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 8935c75 | |
---|---|---|
Author: | Michael Smith | |
Committer: | Michael Smith |
IMPALA-11859: Add bytes-read-encrypted metric Adds a metric bytes-read-encrypted to track encrypted reads. Testing: - ran test_io_metrics.py with Ozone (encrypts by default) - ran test_io_metrics.py with HDFS (no encryption) Change-Id: I9dbc194a4bc31cb0e01545fb6032a0853db60f34 Reviewed-on: http://gerrit.cloudera.org:8080/19461 Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | f350456 | |
---|---|---|
Author: | LPL | |
Committer: | Impala Public Jenkins |
IMPALA-11681: Set table stats for the Iceberg table by it's partition stats For the Iceberg tables, table-level statistics such as numRows can be computed according to iceberg parition stats, which is more accurate and real-time. Obtaining these statistics is independent of StatsSetupConst.ROW_COUNT and StatsSetupConst.TOTAL_SIZE in HMS. This is an improvement for estimating the cardinality of the Iceberg tables. But now the calculation of V2 Iceberg table is not accurate, maybe after IMPALA-11516(Return better partition stats for V2 tables) is ready, they can be considered to replace those MHS statistics. Testing: - Existing tests - Test on 'On-demand Metadata' mode - For 'select * from iceberg_v2_positional_not_all_data_files_have_delete_files where i = (select max(i) from iceberg_v2_positional_update_all_rows)', the 'Join Order' and 'Distribution Mode' are the same as when table stats are present Change-Id: I3e92d3f25e2a57a64556249410d0af3522598c00 Reviewed-on: http://gerrit.cloudera.org:8080/19168 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | cc26f34 | |
---|---|---|
Author: | LPL | |
Committer: | Impala Public Jenkins |
IMPALA-11507: Use absolute_path when Iceberg data files are outside of the table location For Iceberg tables, when one of the following properties is used, it is considered that the table is possible to have data outside the table location directory: - 'write.object-storage.enabled' is true - 'write.data.path' is not empty - 'write.location-provider.impl' is configured - 'write.object-storage.path'(Deprecated) is not empty - 'write.folder-storage.path'(Deprecated) is not empty We should tolerate the situation that relative path of the data files cannot be obtained by the table location path, and we could use the absolute path in that case. E.g. the ETL program will write the table that the metadata of the Iceberg tables is placed in 'hdfs://nameservice_meta/warehouse/hadoop_catalog/ice_tbl/metadata', the recent data files in 'hdfs://nameservice_data/warehouse/hadoop_catalog/ice_tbl/data', and the data files half a year ago in 's3a://nameservice_data/warehouse/hadoop_catalog/ice_tbl/data', it should still be queried normally by Impala. Testing: - added e2e tests Change-Id: I666bed21d20d5895f4332e92eb30a94fa24250be Reviewed-on: http://gerrit.cloudera.org:8080/18894 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | eda2aa5 | |
---|---|---|
Author: | wzhou-code | |
Committer: | Impala Public Jenkins |
IMPALA-11129: Support running KRPC over Unix Domain Socket This patch make following changes to support running KRPC over UDS. - Add FLAGS_rpc_use_unix_domain_socket to enable running KRPC over UDS. Add FLAGS_uds_address_unique_id to specify unique Id for UDS address. It could be 'ip_address', 'backend_id', or 'none'. - Add variable uds_address in NetworkAddressPB and TNetworkAddress. Replace TNetworkAddress with NetworkAddressPB for KRPC related class variables and APIs. - Set UDS address for each daemon as @impala-kprc:<unique_id> during initialization with unique_id specified by starting flag FLAGS_uds_address_unique_id. - When FLAG_rpc_use_unix_domain_socket is true, the socket of KRPC server will be binded to the UDS address of the daemon. KRPC Client will connect to KRPC server with the UDS address of the server when creating proxy service, which in turn call kudu::Socket::Connect() function to connect KRPC server. - rpcz Web page show TCP related stats as 'N/A' when using UDS. Show remote UDS address for KRPC inbound connections on rpcz Web page as '*' when using UDS since the remote UDS addresses are not available. - Add new unit-tests for UDS. - BackendId of admissiond is not available. Use admissiond's IP address as unique ID for UDS. TODO: Advertise BackendId of admissiond in global admission control mode. Testing: - Passed core test with FLAG_rpc_use_unix_domain_socket as fault value false. - Passed core test with FLAG_rpc_use_unix_domain_socket as true. Change-Id: I439f5a03eb425c17451bcaa96a154bb0bca17ee7 Reviewed-on: http://gerrit.cloudera.org:8080/18369 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | c10e951 | |
---|---|---|
Author: | Zoltan Borok-Nagy | |
Committer: | Impala Public Jenkins |
IMPALA-11053: Impala should be able to read migrated partitioned Iceberg tables When Hive (and probably other engines as well) converts a legacy Hive table to Iceberg it doesn't rewrite the data files. It means that the data files don't have write ids neither partition column data. Currently Impala expects the partition columns to be present in the data files, so it is not able to read converted partitioned tables. With this patch Impala loads partition values from the Iceberg metadata. The extra metadata information is attached to the file descriptor objects and propageted to the scanners. This metadata contains the Iceberg data file format (later it could be used to handle mixed-format tables), and partition data. We use the partition data in the HdfsScanner to create the template tuple that contains the partition values of identity-partitioned columns. This is not only true to migrated tables, but all Iceberg tables with identity partitions, which means we also save some IO and CPU time for such columns. The partition information could also be used for Dynamic Partition Pruning later. We use the (human-readable) string representation of the partition data when storing them in the flat buffers. This helps debugging, also it provides the needed flexibility when the partition columns evolve (e.g. INT -> BIGINT, DECIMAL(4,2) -> DECIMAL(6,2)). Testing * e2e test for all data types that can be used to partition a table * e2e test for migrated partitioned table + schema evolution (without renaming columns) * e2e for table where all columns are used as identity-partitions Change-Id: Iac11a02de709d43532056f71359c49d20c1be2b8 Reviewed-on: http://gerrit.cloudera.org:8080/18240 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 374783c | |
---|---|---|
Author: | stiga-huang | |
Committer: | Impala Public Jenkins |
IMPALA-10898: Add runtime IN-list filters for ORC tables ORC files have optional bloom filter indexes for each column. Since ORC-1.7.0, the C++ reader supports pushing down predicates to skip unreleated RowGroups. The pushed down predicates will be evaludated on file indexes (i.e. statistics and bloom filter indexes). Note that only EQUALS and IN-list predicates can leverage bloom filter indexes. Currently Impala has two kinds of runtime filters: bloom filter and min-max filter. Unfortunately they can't be converted into EQUALS or IN-list predicates. So they can't leverage the file level bloom filter indexes. This patch adds runtime IN-list filters for this purpose. Currently they are generated for the build side of a broadcast join. They will only be applied on ORC tables and be pushed down to the ORC reader(i.e. ORC lib). To avoid exploding the IN-list, if # of distinct values of the build side exceeds a threshold (default to 1024), we set the filter to ALWAYS_TRUE and clear its entry. The threshold can be configured by a new query option, RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT. Evaluating runtime IN-list filters is much slower than evaluating runtime bloom filters due to the current simple implementation (i.e. std::unorder_set) and the lack of codegen. So we disable it at row level. For visibility, this patch addes two counters in the HdfsScanNode: - NumPushedDownPredicates - NumPushedDownRuntimeFilters They reflect the predicates and runtime filters that are pushed down to the ORC reader. Currently, runtime IN-list filters are disabled by default. This patch extends the query option, ENABLED_RUNTIME_FILTER_TYPES, to support a comma separated list of filter types. It defaults to be "BLOOM,MIN_MAX". Add "IN_LIST" in it to enable runtime IN-list filters. Ran perf tests on a 3 instances cluster on my desktop using TPC-DS with scale factor 20. It shows significant improvements in some queries: +-----------+-------------+--------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+--------+ | Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval | +-----------+-------------+--------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+--------+ | TPCDS(20) | TPCDS-Q67A | orc / snap / block | 35.07 | 44.01 | I -20.32% | 0.38% | 1.38% | 10 | I -25.69% | -3.58 | -45.33 | | TPCDS(20) | TPCDS-Q37 | orc / snap / block | 1.08 | 1.45 | I -25.23% | 7.14% | 3.09% | 10 | I -34.09% | -3.58 | -12.94 | | TPCDS(20) | TPCDS-Q70A | orc / snap / block | 6.30 | 8.60 | I -26.81% | 5.24% | 4.21% | 10 | I -36.67% | -3.58 | -14.88 | | TPCDS(20) | TPCDS-Q16 | orc / snap / block | 1.33 | 1.85 | I -28.28% | 4.98% | 5.92% | 10 | I -39.38% | -3.58 | -12.93 | | TPCDS(20) | TPCDS-Q18A | orc / snap / block | 5.70 | 8.06 | I -29.25% | 3.00% | 4.12% | 10 | I -40.30% | -3.58 | -19.95 | | TPCDS(20) | TPCDS-Q22A | orc / snap / block | 2.01 | 2.97 | I -32.21% | 6.12% | 5.94% | 10 | I -47.68% | -3.58 | -14.05 | | TPCDS(20) | TPCDS-Q77A | orc / snap / block | 8.49 | 12.44 | I -31.75% | 6.44% | 3.96% | 10 | I -49.71% | -3.58 | -16.97 | | TPCDS(20) | TPCDS-Q75 | orc / snap / block | 7.76 | 12.27 | I -36.76% | 5.01% | 3.87% | 10 | I -59.56% | -3.58 | -23.26 | | TPCDS(20) | TPCDS-Q21 | orc / snap / block | 0.71 | 1.27 | I -44.26% | 4.56% | 4.24% | 10 | I -77.31% | -3.58 | -28.31 | | TPCDS(20) | TPCDS-Q80A | orc / snap / block | 9.24 | 20.42 | I -54.77% | 4.03% | 3.82% | 10 | I -123.12% | -3.58 | -40.90 | | TPCDS(20) | TPCDS-Q39-1 | orc / snap / block | 1.07 | 2.26 | I -52.74% | * 23.83% * | 2.60% | 10 | I -149.68% | -3.58 | -14.43 | | TPCDS(20) | TPCDS-Q39-2 | orc / snap / block | 1.00 | 2.33 | I -56.95% | * 19.53% * | 2.07% | 10 | I -151.89% | -3.58 | -20.81 | +-----------+-------------+--------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+--------+ "Base Avg" is the avg of the original time. "Avg" is the current time. However, we also see some regressions due to the suboptimal implementation. The follow-up JIRAs will focus on improvements: - IMPALA-11140: Codegen InListFilter::Insert() and InListFilter::Find() - IMPALA-11141: Use exact data types in IN-list filters instead of casting data to a set of int64_t or a set of string. - IMPALA-11142: Consider IN-list filters in partitioned joins. Tests: - Test IN-list filter on string, date and all integer types - Test IN-list filter with NULL - Test IN-list filter on complex exprs targets Change-Id: I25080628233799aa0b6be18d5a832f1385414501 Reviewed-on: http://gerrit.cloudera.org:8080/18141 Reviewed-by: Qifan Chen <qchen@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | e36f2b5 | |
---|---|---|
Author: | wzhou-code | |
Committer: | Wenzhe Zhou |
IMPALA-10931 (part 1): Rebase copied Kudu source code Removed be/src/kudu/{util|security|rpc} and copied back over from Kudu commit: 136a8c21549c9d81d96d8ef3cfa4d200b9325f32 Change-Id: Ie7bc10e9436db48169f67765159a3838df5f473a Reviewed-on: http://gerrit.cloudera.org:8080/18235 Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com> Tested-by: Wenzhe Zhou <wzhou@cloudera.com>
Commit: | c8aa579 | |
---|---|---|
Author: | Attila Jeges | |
Committer: | Attila Jeges |
IMPALA-10879: Add parquet stats to iceberg manifest This patch adds parquet stats to iceberg manifest as per-datafile metrics. The following metrics are supported: - column_sizes : Map from column id to the total size on disk of all regions that store the column. Does not include bytes necessary to read other columns, like footers. - null_value_counts : Map from column id to number of null values in the column. - lower_bounds : Map from column id to lower bound in the column serialized as binary. Each value must be less than or equal to all non-null, non-NaN values in the column for the file. - upper_bounds : Map from column id to upper bound in the column serialized as binary. Each value must be greater than or equal to all non-null, non-Nan values in the column for the file. The corresponding parquet stats are collected by 'ColumnStats' (in 'min_value_', 'max_value_', 'null_count_' members) and 'HdfsParquetTableWriter::BaseColumnWriter' (in 'total_compressed_byte_size_' member). Testing: - New e2e test was added to verify that the metrics are written to the Iceberg manifest upon inserting data. - New e2e test was added to verify that lower_bounds/upper_bounds metrics are used to prune data files on querying iceberg tables. - Existing e2e tests were updated to work with the new behavior. - BE test for single-value serialization. Relevant Iceberg documentation: - Manifest: https://iceberg.apache.org/spec/#manifests - Values in lower_bounds and upper_bounds maps should be Single-value serialized to binary: https://iceberg.apache.org/spec/#appendix-d-single-value-serialization Change-Id: Ic31f2260bc6f6a7f307ac955ff05eb154917675b Reviewed-on: http://gerrit.cloudera.org:8080/17806 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Attila Jeges <attilaj@cloudera.com>
Commit: | 05ea14b | |
---|---|---|
Author: | Bikramjeet Vig | |
Committer: | Impala Public Jenkins |
IMPALA-10720: Add versioning to admission heartbeats This patch adds version numbers to admission heartbeats which allows the admission service to ignore stale (out of order) heartbeats from coordinators. Change-Id: I1338211dc0bca67f8fde93a1b05c0780be583d5d Reviewed-on: http://gerrit.cloudera.org:8080/17524 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | c65d786 | |
---|---|---|
Author: | Csaba Ringhofer | |
Committer: | Impala Public Jenkins |
IMPALA-10656: Fire insert events before commit Before this fix Impala committed an insert first, then reloaded the table from HMS, and generated the insert events based on the difference between the two snapshots. (e.g. which file was not present in the old snapshot but are there in the new one). Hive replication expects the insert events before the commit, so this may potentially lead to issues there. The solution is to collect the new files during the insert in the backend, and send the insert events based on this file set. This wasn't very hard to do as we were already collecting the files in some cases: - to move them from staging dir to their final location in case of non-partitioned tables - to write the file list to snapshot files in case of Iceberg tables This patch unifies the paths above and collects all information about the created files regardless of the table type. Testing: - no new tests, insert events were already covered in test_event_processing.py and MetastoreEventsProcessorTest.java - ran core tests Change-Id: I2ed812dbcb5f55efff3a910a3daeeb76cd3295b9 Reviewed-on: http://gerrit.cloudera.org:8080/17313 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | e3bafcb | |
---|---|---|
Author: | Thomas Tauber-Marshall | |
Committer: | Impala Public Jenkins |
IMPALA-10590: Introduce admission service heartbeat mechanism Currently, if a ReleaseQuery rpc fails, it's possible for the admission service to think that some resources are still being used that are actually free. This patch fixes the issue by introducing a periodic heartbeat rpc from coordinators to the admission service which contains a list of queries registered at that coordinator. If there is a query that the admission service thinks is running but is not included in the heartbeat, the admission service can conclude that the query must have already completed and release its resources. Testing: - Added a test that uses a debug action to simulate ReleaseQuery rpcs failing and checks that query resources are released properly. Change-Id: Ia528d92268cea487ada20b476935a81166f5ad34 Reviewed-on: http://gerrit.cloudera.org:8080/17194 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | c823f17 | |
---|---|---|
Author: | Thomas Tauber-Marshall | |
Committer: | Impala Public Jenkins |
IMPALA-10577: Add retrying of AdmitQuery This patch adds retries of the AdmitQuery rpc by coordinators. This helps to ensure that if an admissiond goes down and is restarted or is temporarily unreachable, queries won't fail. The retries are done with backoff and jitter to avoid overloading the admissiond in these scenarios. A new flag, --admission_max_retry_time_s, is added to control how long queries will continue retrying before giving up. The AdmitQuery rpc is made idempotent - if a query is submitted with the same query id as one the admissiond already knows about, AdmitQuery will return OK without submitting the query to be scheduled again. Testing: - Added a custom cluster test that checks that queries won't fail when the admissiond goes down. Change-Id: I8bc0cac666bbd613a1143c0e2c4f84d3b0ad003a Reviewed-on: http://gerrit.cloudera.org:8080/17188 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | b5e2a0c | |
---|---|---|
Author: | wzhou-code | |
Committer: | Impala Public Jenkins |
IMPALA-9224: Blacklist nodes with faulty disk for spilling This patch extends blacklist functionality by adding executor node to blacklist if a query fails caused by disk failure during spill-to-disk. Also classifies disk error codes and defines a blacklistable error set for non-transient disk errors. Coordinator blacklists executor only if the executor hitted blacklistable error during spill-to-disk. Adds a new debug action to simulate disk write error during spill-to- disk. To use, specify in query options as: 'debug_action': 'IMPALA_TMP_FILE_WRITE:<hostname>:<port>:<action>' where <hostname> and <port> represent the impalad which execute the fragment instances, <port> is the BE krpc port (default 27000). Adds new test cases for blacklist and query-retry to cover the code changes. Testing: - Passed new test cases. - Passed exhaustive test. - Manually simulated disk failures in scratch directories on nodes of a cluster, verified that the nodes were blacklisted as expected. Change-Id: I04bfcb7f2e0b1ef24a5b4350f270feecd8c47437 Reviewed-on: http://gerrit.cloudera.org:8080/16949 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 4099a60 | |
---|---|---|
Author: | Fucun Chu | |
Committer: | Impala Public Jenkins |
IMPALA-10317: Add query option that limits huge joins at runtime This patch adds support for limiting the rows produced by a join node such that runaway join queries can be prevented. The limit is specified by a query option. Queries exceeding that limit get terminated. The checking runs periodically, so the actual rows produced may go somewhat over the limit. JOIN_ROWS_PRODUCED_LIMIT is exposed as an advanced query option. Rows produced Query profile is updated to include query wide and per backend metrics for RowsReturned. Example from " set JOIN_ROWS_PRODUCED_LIMIT = 10000000; select count(*) from tpch_parquet.lineitem l1 cross join (select * from tpch_parquet.lineitem l2 limit 5) l3;": NESTED_LOOP_JOIN_NODE (id=2): - InactiveTotalTime: 107.534ms - PeakMemoryUsage: 16.00 KB (16384) - ProbeRows: 1.02K (1024) - ProbeTime: 0.000ns - RowsReturned: 10.00M (10002025) - RowsReturnedRate: 749.58 K/sec - TotalTime: 13s337ms Testing: Added tests for JOIN_ROWS_PRODUCED_LIMIT Change-Id: Idbca7e053b61b4e31b066edcfb3b0398fa859d02 Reviewed-on: http://gerrit.cloudera.org:8080/16706 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | ca88447 | |
---|---|---|
Author: | Thomas Tauber-Marshall | |
Committer: | Impala Public Jenkins |
IMPALA-9930 (part 2): Introduce new admission control rpc service This patch introduces a new krpc service, AdmissionControlService, which coordinators can use to submit queries for admission. This patch adds some simple configuration flags that make it possible to have coordinators use this service to submit their queries for admission to other coordinators. These flags are only to make this patch testable and will be replaced when the separate admission control daemon is introduced in IMPALA-9975. The interface consists of the following RPCs: - AdmitQuery: takes a TQueryExecRequest and a TQueryOptions (serialized into sidecars), places the request on a queue to be processed by a thread pool and then immediately returns. - GetQueryStatus: takes a query id and returns the current admission status, including the QuerySchedulePB if admission has completed successfully but the query has not been released yet. - ReleaseQueryBackends: called when individual backends complete but the overall query is still running to release resources incrementally. This RPC will be called at most O(log(# backends)) per query due to BackendResourceState, which batches backends to release together. - ReleaseQuery: called when the query has completely finished. Releases all remaining resources. - CancelAdmission: called if a query is cancelled before an admission decision has been made to indicate that it should no longer be considered for admission. The majority of the patch consists of two classes: - AdmissionControlClient: used to abstract whether admission is being performed locally or remotely. In the local case, it is basically just a wrapper around AdmissionController. In the remote case, it handles serializing/deserializing of RPC params, polling GetQueryStatus() until a decision has been made, etc. - AdmissionControlService: exports the RPC interface and acts as a wrapper around AdmissionController. Some notable changes involved: - AdmissionController::SubmitForAdmission() no longer blocks while a query is queued. Instead, a new function WaitOnQueued() can be used to monitor the admission status of a queued query. - Adding events to the query timeline is moved out of AdmissionController and into the AdmissionControlClient classes, so that it always happens on the coordinator. - When a cluster is run in the new admission control service mode, only the impalad that is performing admission control exposes the /admission http endpoint. Observability will be cleaned up in a subsequent patch. Testing: - Modified existing admission control tests to run both with and without the admission control service enabled, including both the functional and stress tests. The 'num_queries' param in the stress test is modified to only use a single value to reduce the number of tests that are run and keep the running time reasonable. - Ran tpch10 on a local minicluster and observed no significant regressions. Change-Id: I594fc593a27b24b6952e381a9bc1a9a5c6b757ae Reviewed-on: http://gerrit.cloudera.org:8080/16412 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 9429bd7 | |
---|---|---|
Author: | Tim Armstrong | |
Committer: | Impala Public Jenkins |
IMPALA-9382: part 2/3: aggregate profiles sent to coordinator This reworks the status reporting so that serialized AggregatedRuntimeProfile objects are sent from executors to coordinators. These profiles are substantially denser and faster to process for higher mt_dop values. The aggregation is also done in a single step, merging the aggregated thrift profile from the executor directly into the final aggregated profile, instead of converting it to an unaggregated profile first. The changes required were: * A new Update() method for AggregatedRuntimeProfile that updates the profile from a serialised AggregateRuntimeProfile for a subset of the instances. The code is generalized from the existing InitFromThrift() code path. * Per-fragment reports included in the status report protobuf when --gen_experimental_profile=true. * Logic on the coordinator that either consumes serialized AggregatedRuntimeProfile per fragment, when --gen_experimental_profile=true, or consumes a serialized RuntimeProfile per finstance otherwise. This also adds support for event sequences and time series in the aggregated profile, so the amount of information in the aggregated profile is now on par with the basic profile. We also finish off support for JSON profile. The JSON profile is more stripped down because we do not need to round-trip profiles via JSON and it is a much less dense profile representation. Part 3 will clean up and improve the display of the profile. Testing: * Add sanity tests for aggregated runtime profile. * Add unit tests to exercise aggregation of the various counter types * Ran core tests. Change-Id: Ic680cbfe94c939c2a8fad9d0943034ed058c6bca Reviewed-on: http://gerrit.cloudera.org:8080/16057 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 1af60a1 | |
---|---|---|
Author: | wzhou-code | |
Committer: | Impala Public Jenkins |
IMPALA-9180 (part 3): Remove legacy backend port The legacy Thrift based Impala internal service has been removed so the backend port 22000 can be freed up. This patch set flag be_port as a REMOVED_FLAG and all infrastructures around it are cleaned up. StatestoreSubscriber::subscriber_id is set as hostname + krpc_port. Testing: - Passed the exhaustive test. Change-Id: Ic6909a8da449b4d25ee98037b3eb459af4850dc6 Reviewed-on: http://gerrit.cloudera.org:8080/16533 Reviewed-by: Thomas Tauber-Marshall <tmarshall@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 981ef10 | |
---|---|---|
Author: | Zoltan Borok-Nagy | |
Committer: | Impala Public Jenkins |
IMPALA-10215: Implement INSERT INTO for non-partitioned Iceberg tables (Parquet) This commit adds support for INSERT INTO statements against Iceberg tables when the table is non-partitioned and the underlying file format is Parquet. We still use Impala's HdfsParquetTableWriter to write the data files, though they needed some modifications to conform to the Iceberg spec, namely: * write Iceberg/Parquet 'field_id' for the columns * TIMESTAMPs are encoded as INT64 micros (without time zone) We use DmlExecState to transfer information from the table sink operators to the coordinator, then updateCatalog() invokes the AppendFiles API to add files atomically. DmlExecState is encoded in protobuf, communication with the Frontend uses Thrift. Therefore to avoid defining Iceberg DataFile multiple times they are stored in FlatBuffers. The commit also does some corrections on Impala type <-> Iceberg type mapping: * Impala TIMESTAMP is Iceberg TIMESTAMP (without time zone) * Impala CHAR is Iceberg FIXED Testing: * Added INSERT tests to iceberg-insert.test * Added negative tests to iceberg-negative.test * I also did some manual testing with Spark. Spark is able to read Iceberg tables written by Impala until we use TIMESTAMPs. In that case Spark rejects the data files because it only accepts TIMESTAMPS with time zone. * Added concurrent INSERT tests to test_insert_stress.py Change-Id: I5690fb6c2cc51f0033fa26caf8597c80a11bcd8e Reviewed-on: http://gerrit.cloudera.org:8080/16545 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 01316ad | |
---|---|---|
Author: | wzhou-code | |
Committer: | Impala Public Jenkins |
IMPALA-10154: Fix data race on coord_backend_id in TSAN build This issue was introduced by the patch for IMPALA-5746. QueryState::exec_rpc_params_.coord_backend_id is set in function QuestState::Init(), but it could be accessed by QueryExecMgr object in QueryExecMgr::CancelQueriesForFailedCoordinators() before or during QueryState::Init() is called, hence cause data race. To fix it, move coord_backend_id from class ExecQueryFInstancesRequestPB to class TQueryCtx. QueryState::query_ctx_ is a constant variable and is set in QueryState c'tor so that QueryState::query_ctx_.coord_backend_id is valid and will not be changed once the QuestState object is created. Testing: - Passed tests/custom_cluster/test_process_failures.py. - Passed the core tests for normal build. - Passed the core tests against a TSAN build. Change-Id: I1c4b51e741a28b80bf3485adff8c97aabe0a3f67 Reviewed-on: http://gerrit.cloudera.org:8080/16437 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 9d43cfd | |
---|---|---|
Author: | wzhou-code | |
Committer: | Impala Public Jenkins |
IMPALA-5746: Cancel all queries scheduled by failed coordinators Executor registers the updating of cluster membership. When coordinators are absence from the active cluster membership list, executer cancels all the running fragments of the queries which are scheduled by the inactive coordinators since the executer cannot send results back to the inactive/failed coordinators. This makes executers quickly release the resources allocated for those running fragments to be cancelled. Testing: - Added new test case TestProcessFailures::test_kill_coordinator and ran the test case as following command: ./bin/impala-py.test tests/custom_cluster/test_process_failures.py\ ::TestProcessFailures::test_kill_coordinator \ --exploration_strategy=exhaustive. - Passed the core test. Change-Id: I918fcc27649d5d2bbe8b6ef47fbd9810ae5f57bd Reviewed-on: http://gerrit.cloudera.org:8080/16215 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 5509951 | |
---|---|---|
Author: | wzhou-code | |
Committer: | Impala Public Jenkins |
IMPALA-9294: Support DATE for min-max runtime filter Implemented Date min-max filter and applied it to Kudu as other min-max runtime filters. Added new test cases for Date min-max filters. Testing: Passed all core tests. Change-Id: Ic2f6e2dc6949735d5f0fcf317361cc2969a5e82c Reviewed-on: http://gerrit.cloudera.org:8080/16103 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | b142d03 | |
---|---|---|
Author: | Thomas Tauber-Marshall | |
Committer: | Impala Public Jenkins |
IMPALA-9692 (part 3): Model QuerySchedule as a protobuf In order to support the new admission control service, we need to be able to return the results of an admission attempt, i.e. the query schedule, to the coordinator. To enable this, this patch moves all parts of the QuerySchedule class and related classes that are required by the coordinator into a new message QuerySchedulePB. The main admission control interface, SubmitForAdmission(), now returns a QuerySchedulePB. Some notable changes: - Previously, QuerySchedule was used by Coordinator as a way to pass around references to parts of the TExecRequest to places like Coordinator::ExecSummary and Coordinator::BackendState. This has been replaced with the ExecParams class, which is a container for references to the TExecRequest and QuerySchedulePB along with convenience functions for accessing them. - Similarly, FragmentExecParams, which is part of QuerySchedule, contains references to the associated TPlanFragment, owned by the TExecRequest, which were used by the Coordinator when iterating over the schedule to initiate the query. Since FragmentExecParamsPB can't contain these references, they were replaced by a map between fragment idx and TPlanFragment in ExecParams. - In order to keep payloads reasonable for the eventual RPC interface, AdmissionController::ReleaseQuery() and ReleaseQueryBackend() now take a query id as a parameter instead of a QuerySchedule. To facilitate this, AdmissionController now maintains a map from query ids of running queries to the resources that were allocated for them so that it can look the resources up when releasing them. This map will be necessary when implementing the admission control service to facilitate proper accounting of resouces in cases like coordinator failures. - As scheduling is currently organized, we first construct the FragmentExecParams with the FInstanceExecParams as their children, then we construct the BackendExecParams which get references to their FInstanceExecParams. Since we can't send references like these through an rpc, we now instead Swap() the FInstanceExecParamsPB into the BackendExecParamsPB. Testing: - Updated related tests. - Passed a full run of existing tests. Change-Id: I1db64e72f84604b1d8ac24e0bdd4ad6bedd6bcd9 Reviewed-on: http://gerrit.cloudera.org:8080/15961 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 67b4764 | |
---|---|---|
Author: | Tim Armstrong | |
Committer: | Impala Public Jenkins |
IMPALA-9752: aggregate profile stats on executor Before this change the coordinator depended on getting the full fragment instance profiles from executors to pull out various things. This change removes that dependency by pulling out the information on the executor, and including it in the status report protobuf. This should slightly reduce the amount of work done on the coordinator, but more importantly, makes it easier to switch to sending aggregated profiles from executor to coordinator, because the coordinator no longer depends on receiving individual instance profiles. Per-host peak memory is included directly in the status report. Per-backend stats - where the per-backend total is needed - are aggregated on the executor with the result included in the status report. These counters are: BytesRead, ScanRangesComplete, TotalBytesSent, TotalThreads{User,Sys}Time. One subtlety to keep in mind that status reports don't include stats for instances where the final update was sent in a previous status report. So the executor needs to ensure that stats for finished fragment instances are included in updates. This is achieved by caching those values in FragmentInstanceState. The stats used in the exec summary were previously also plucked out of the profile on the coordinator. This change moves the work to the executor, and includes the per-node stats in the status report. I did a little cleanup of the profile counter declarations, making sure they were consistently inside the impala namespace in the files that I touched. Testing: Ran core tests. Manually checked exec summary, query profiles and backends page for a running query. Change-Id: Ia2aca354d803ce78a798a1a64f9f98353b813e4a Reviewed-on: http://gerrit.cloudera.org:8080/16050 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 51687e5 | |
---|---|---|
Author: | Tim Armstrong | |
Committer: | Tim Armstrong |
IMPALA-9835: make kudu_scan_token binary We got log spam from protobufs because the scan tokens are not valid UTF-8 (and not intended to be). Change-Id: I2db9e4aad777065f6bf6ab1c9add3c99e8a0cf0a Reviewed-on: http://gerrit.cloudera.org:8080/16040 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Commit: | b67c090 | |
---|---|---|
Author: | Thomas Tauber-Marshall | |
Committer: | Impala Public Jenkins |
IMPALA-9692 (part 2): Refactor parts of TExecPlanFragmentInfo to protobuf The new admission control service will be written in protobuf, so there are various admission control related structures currently stored in Thrift that it would be convenient to convert to protobuf, to minimize the amount of converting back and forth that needs to be done. This patch converts some portions of TExecPlanFragmentInfo to protobuf. TExecPlanFragmentInfo is sent as a sidecar with the Exec() rpc, so the refactored parts are now just directly included in the ExecQueryFInstancesRequestPB. The portions that are converted are those that are part of the QuerySchedule, in particular the TPlanFragmentDestination, TScanRangeParams, and TJoinBuildInput. This patch is just a refactor and doesn't contain any functional changes. One notable related change is that DataSink::CreateSink() has two parameters removed - TPlanFragmentCtx (which no longer exists) and TPlanFragmentInstanceCtx. These variables and the new PB eqivalents are available via the RuntimeState that was already being passed in as another parameter and don't need to be individually passed in. Testing: - Passed a full run of existing tests. - Ran the single node perf test and didn't detect any regressions. Change-Id: I3a8e46767b257bbf677171ac2f4efb1b623ba41b Reviewed-on: http://gerrit.cloudera.org:8080/15844 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | f7770f1 | |
---|---|---|
Author: | wzhou-code | |
Committer: | Impala Public Jenkins |
IMPALA-3741 [part 1]: Upgraded kudu/util for BloomFilter Ported BlockBloomFilter related source files from Kudu upstream to Impala be/src/kudu/util. The git hash of Kudu to take these files is 389d4f1e1c. Testing: Passed core tests. Change-Id: Ifac41ffb3e1742ffb6a969cb1c368d6d93c23357 Reviewed-on: http://gerrit.cloudera.org:8080/15676 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | e46bbf2 | |
---|---|---|
Author: | Thomas Tauber-Marshall | |
Committer: | Impala Public Jenkins |
IMPALA-9692 (part 1): Refactor TBackendDescriptor to protobuf The new admission control service will be written in protobuf, so there are various admission control related structures currently stored in Thrift that it would be convenient to convert to protobuf, to minimize the amount of converting back and forth that needs to be done. This patch converts TBackendDescriptor to protobuf. It isn't used directly in any rpcs - we serialize it ourselves to send to the statestore as a string, so no rpc definitions are affected. This patch is just a refactor and doesn't contain any functional changes. Testing: - Passed a full run of existing tests. Change-Id: Ie7d1e373d9c87887144517ff6a4c2d5996aa88b8 Reviewed-on: http://gerrit.cloudera.org:8080/15822 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | fe001a7 | |
---|---|---|
Author: | Thomas Tauber-Marshall | |
Committer: | Thomas Tauber-Marshall |
IMPALA-9335 (part 1): Rebase copied Kudu source code Removed be/src/kudu/{util|security|rpc} and copied back over from Kudu commit: fc4ab691a502067bc4d5bdff30507cac7feb7cfe Change-Id: Ia0190f15b96563f5edc40065fa4690c467c50f83 Reviewed-on: http://gerrit.cloudera.org:8080/15143 Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com> Tested-by: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Commit: | f38da0d | |
---|---|---|
Author: | Tim Armstrong | |
Committer: | Impala Public Jenkins |
IMPALA-4400: aggregate runtime filters locally Move RuntimeFilterBank to QueryState(). Implement fine-grained locking for each filter to mitigate any increased lock contention from the change. Make RuntimeFilterBank handle multiple producers of the same filter, e.g. multiple instances of a partitioned join. It computes the expected number of filters upfront then sends the filter to the coordinator once all the local instances have been merged together. The merging can be done in parallel locally to improve latency of filter propagation. Add Or() methods to MinMaxFilter and BloomFilter, since we now need to merge those, not just the thrift versions. Update coordinator filter routing to expect only one instance of a filter from each producer backend and to only send one instance to each consumer backend (instead of sending one per fragment). Update memory reservations and estimates to be lower to account for sharing of filters between fragment instances. mt_dop plans are modified to show these shared and non-shared resources separately. Enable waiting for runtime filters for kudu scanner with mt_dop. Made min/max filters const-correct. Testing * Added unit tests for Or() methods. * Added some additional e2e test coverage for mt_dop queries * Updated planner tests with new estimates and reservation. * Ran a single node 3-impalad stress test with TPC-H kudu and TPC-DS parquet. * Ran exhaustive tests. * Ran core tests with ASAN. Perf * Did a single-node perf run on TPC-H with default settings. No perf change. * Single-node perf run with mt_dop=8 showed significant speedups: +----------+-----------------------+---------+------------+------------+----------------+ | Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) | +----------+-----------------------+---------+------------+------------+----------------+ | TPCH(30) | parquet / none / none | 10.14 | -7.29% | 5.05 | -11.68% | +----------+-----------------------+---------+------------+------------+----------------+ +----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+---------+ | Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval | +----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+---------+ | TPCH(30) | TPCH-Q7 | parquet / none / none | 38.87 | 38.44 | +1.13% | 7.17% | * 10.92% * | 20 | +0.72% | 0.72 | 0.39 | | TPCH(30) | TPCH-Q1 | parquet / none / none | 4.28 | 4.26 | +0.50% | 1.92% | 1.09% | 20 | +0.03% | 0.31 | 1.01 | | TPCH(30) | TPCH-Q22 | parquet / none / none | 2.32 | 2.32 | +0.05% | 2.01% | 1.89% | 20 | -0.03% | -0.36 | 0.08 | | TPCH(30) | TPCH-Q15 | parquet / none / none | 3.73 | 3.75 | -0.42% | 0.84% | 1.05% | 20 | -0.25% | -0.77 | -1.40 | | TPCH(30) | TPCH-Q13 | parquet / none / none | 9.80 | 9.83 | -0.38% | 0.51% | 0.80% | 20 | -0.32% | -1.30 | -1.81 | | TPCH(30) | TPCH-Q2 | parquet / none / none | 1.98 | 2.00 | -1.32% | 1.74% | 2.81% | 20 | -0.64% | -1.71 | -1.79 | | TPCH(30) | TPCH-Q6 | parquet / none / none | 1.22 | 1.25 | -2.14% | 2.66% | 4.15% | 20 | -0.96% | -2.00 | -1.95 | | TPCH(30) | TPCH-Q19 | parquet / none / none | 5.13 | 5.22 | -1.65% | 1.20% | 1.40% | 20 | -1.76% | -3.34 | -4.02 | | TPCH(30) | TPCH-Q16 | parquet / none / none | 2.46 | 2.56 | -4.13% | 2.49% | 1.99% | 20 | -4.31% | -4.04 | -5.94 | | TPCH(30) | TPCH-Q9 | parquet / none / none | 81.63 | 85.07 | -4.05% | 4.94% | 3.06% | 20 | -5.46% | -3.28 | -3.21 | | TPCH(30) | TPCH-Q10 | parquet / none / none | 5.07 | 5.50 | I -7.92% | 0.96% | 1.33% | 20 | I -8.51% | -5.27 | -22.14 | | TPCH(30) | TPCH-Q21 | parquet / none / none | 24.00 | 26.24 | I -8.57% | 0.46% | 0.38% | 20 | I -9.34% | -5.27 | -67.47 | | TPCH(30) | TPCH-Q18 | parquet / none / none | 8.66 | 9.50 | I -8.86% | 0.62% | 0.44% | 20 | I -9.75% | -5.27 | -55.17 | | TPCH(30) | TPCH-Q3 | parquet / none / none | 6.01 | 6.70 | I -10.19% | 1.01% | 0.90% | 20 | I -11.25% | -5.27 | -35.76 | | TPCH(30) | TPCH-Q12 | parquet / none / none | 2.98 | 3.39 | I -12.23% | 1.48% | 1.48% | 20 | I -13.56% | -5.27 | -27.75 | | TPCH(30) | TPCH-Q11 | parquet / none / none | 1.69 | 2.00 | I -15.55% | 1.63% | 1.47% | 20 | I -18.09% | -5.27 | -34.60 | | TPCH(30) | TPCH-Q4 | parquet / none / none | 2.42 | 2.87 | I -15.69% | 1.48% | 1.26% | 20 | I -18.61% | -5.27 | -39.50 | | TPCH(30) | TPCH-Q14 | parquet / none / none | 4.64 | 6.27 | I -26.02% | 1.35% | 0.73% | 20 | I -35.37% | -5.27 | -94.07 | | TPCH(30) | TPCH-Q20 | parquet / none / none | 3.19 | 4.37 | I -27.01% | 1.54% | 0.99% | 20 | I -36.85% | -5.27 | -80.74 | | TPCH(30) | TPCH-Q5 | parquet / none / none | 4.57 | 6.39 | I -28.36% | 1.04% | 0.75% | 20 | I -39.56% | -5.27 | -120.02 | | TPCH(30) | TPCH-Q17 | parquet / none / none | 3.15 | 4.71 | I -33.06% | 1.59% | 1.31% | 20 | I -49.43% | -5.27 | -87.64 | | TPCH(30) | TPCH-Q8 | parquet / none / none | 5.25 | 7.95 | I -33.95% | 0.95% | 0.53% | 20 | I -51.11% | -5.27 | -185.02 | +----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+---------+ Change-Id: Iabeeab5eec869ff2197250ad41c1eb5551704acc Reviewed-on: http://gerrit.cloudera.org:8080/14538 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 702e6c4 | |
---|---|---|
Author: | Fang-Yu Rao | |
Committer: | Tim Armstrong |
IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8 Reviewed-on: http://gerrit.cloudera.org:8080/13882 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-on: http://gerrit.cloudera.org:8080/14974 Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com> Tested-by: Tim Armstrong <tarmstrong@cloudera.com>
Commit: | 7f7743d | |
---|---|---|
Author: | Sahil Takiar | |
Committer: | Impala Public Jenkins |
IMPALA-9296: Move AuxErrorInfo to StatefulStatus This patch moves AuxErrorInfoPB from FragmentInstanceExecStatusPB to StatefulStatusPB. This is necessary because if the report with the AuxErrorInfoPB is dropped (e.g. due to backpressure at the Coordinator or a flaky network), the next report won't contain the AuxErrorInfoPB, and the error info will be lost. StatefulStatus solves this by detecting any reports that may not have been received by the Coordinator, and re-transmitting any StatefulStatuses that were not successfully delivered. This change also makes the setting of AuxErrorInfoPB stateful, so that the error info only shows up in one report and is then dropped from the RuntimeState. Change-Id: Iabbb48dd3ab58ba7b76b1ab6979b92d0e25e72e3 Reviewed-on: http://gerrit.cloudera.org:8080/15046 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 8a4fece | |
---|---|---|
Author: | Sahil Takiar | |
Committer: | Impala Public Jenkins |
IMPALA-9137: Blacklist node if a DataStreamService RPC to the node fails Introduces a new optional field to FragmentInstanceExecStatusPB: AuxErrorInfoPB. AuxErrorInfoPB contains optional metadata associated with a failed fragment instance. Currently, AuxErrorInfoPB only contains one field: RPCErrorInfoPB, which is only set if the fragment failed because a RPC to another impalad failed. The RPCErrorInfoPB contains the destination node of the failed RPC and the posix error code of the failed RPC. Coordinator::UpdateBackendExecStatus(ReportExecStatusRequestPB, ...) uses the information in RPCErrorInfoPB (if one is set) to blacklist the target node. While RPCErrorInfoPB::dest_node can be set to the address of the Coordinator, the Coordinator will not blacklist itself. The Coordinator only blacklists the node if the RPC failed with a specific error code (currently either ENOTCONN, ECONNREFUSED, ESHUTDOWN). Testing: * Ran core tests * Added new test to test_blacklist.py Change-Id: I733cca13847fde43c8ea2ae574d3ae04bd06419c Reviewed-on: http://gerrit.cloudera.org:8080/14677 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 313d758 | |
---|---|---|
Author: | Tim Armstrong | |
Committer: | Impala Public Jenkins |
IMPALA-9235: add more per-connection stats to /rpcz This backports commit 0f6d33b4a29873197952335a5777ccf9163fc307 from Kudu and makes corresponding Impala changes. It also exposes some additional information returned by KRPC, including per-connection information for inbound connections and in-flight RPCs. Some of these are in the JSON only, others are exposed in the data tables on the debug page. This adds some Protobuf -> JSON utilities similar to those used in Kudu, except instead of outputting strings, they append to rapidjson documents. Testing: Added a sanity test to test_web_pages. Manually checked the /rpcz page when running queries. Backport notes: Mostly this was a clean cherry-pick. I omitted the changes to be/src/kudu/rpc/rpc-test.cc, since those seem to depend on previous Kudu changes and we don't run that test anyway. I also omitted the changes to Kudu's server code, which we don't have copied here. The Kudu commit message is reproduced here: ========================================= rpc: add TCP socket statistics to /rpcz This adds the ability to fetch various bits of socket-level information for each RPC connection and publish the info into /rpcz. The information itself is fetched using getsockopt(TCP_INFO) as well as ioctls to check the current send and receive queue lengths. This data can help resolve whether a use case is network bound or bound by the application itself. For example, a high number of retransmitted packets can indicate that the network path to the receiver is overloaded. Eventually we may want to expose some of this information on a per-call basis. However, doing so is quite tricky, since 'send()' completes when the data has been placed into the outbound packet queue and doesn't wait until the data is ACKed. We'd need to defer checking for retransmissions until all of the data has been ACKed, which is at some indeterminate point in the future. The very newest kernels allow subscribing to such notifications (along with lots of interesting stats) but, given none of that is available in el7, it's probably not worth tackling at this point. ========================================= Change-Id: I3696463e22123fe81073af4aa495a96b7d4f7ee2 Reviewed-on: http://gerrit.cloudera.org:8080/14884 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | a1588e4 | |
---|---|---|
Author: | Thomas Tauber-Marshall | |
Committer: | Impala Public Jenkins |
IMPALA-9181: Serialize TQueryCtx once per query When issuing Exec() rpcs to backends, we currently serialize the TQueryCtx once per backend. This is inefficient as the TQueryCtx is the same for all backends and really only needs to be serialized once. Serializing the TQueryCtx can be expensive as it contains both the full text of the original query and the descriptor table, which can be quite large. In a synthetic dataset I tested with, scanning a table with 100k partitions leads to a descriptor table size of ~20MB. This patch serializes the TQueryCtx in the coordinator and then passes it to each BackendState when calling Exec(). Followup work might consider if we really need all of the info in the TQueryCtx to be distributed to all backends. Testing: - Passed full run of existing tests. - Single node perf run showed no significant change. Change-Id: I6a4dd302fd5602ec2775492a041ddd51e7d7a6c6 Reviewed-on: http://gerrit.cloudera.org:8080/14777 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | e716e76 | |
---|---|---|
Author: | Fang-Yu Rao | |
Committer: | Impala Public Jenkins |
IMPALA-9154: Revert "IMPALA-7984: Port runtime filter from Thrift RPC to KRPC" The previous patch porting runtime filter from Thrift RPC to KRPC introduces a deadlock if there are a very limited number of threads on the Impala cluster. Specifically, in that patch a Coordinator used a synchronous KRPC to propagate an aggregated filter to other hosts. A deadlock would happen if there is no thread available on the receiving side to answer that KRPC especially the calling and receiving threads are called from the same thread pool. One possible way to address this issue is to make the call of propagating a runtime filter asynchronous to free the calling thread. Before resolving this issue, we revert this patch for now. This reverts commit ec11c18884988e838a8838e1e8ecc37461e1a138. Change-Id: I32371a515fb607da396914502da8c7fb071406bc Reviewed-on: http://gerrit.cloudera.org:8080/14780 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | ec11c18 | |
---|---|---|
Author: | Fang-Yu Rao | |
Committer: | Impala Public Jenkins |
IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a Reviewed-on: http://gerrit.cloudera.org:8080/13882 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | b8e2090 | |
---|---|---|
Author: | Todd Lipcon | |
Committer: | Impala Public Jenkins |
Update kudu/security from 9ebcb77aa911aae76c48e717af24e643cb81908d This updates the kudu security code to the latest version, which includes support for GSSAPI calls, necessary for SPNEGO. This is a straight rsync of the kudu/util source code except for some minor CMakeLists changes that were carried over from the old version. Change-Id: Ie7c91193fd49f8ca1234b23cf61fc90c1fdbe2e0 Reviewed-on: http://gerrit.cloudera.org:8080/13767 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 30c3cd9 | |
---|---|---|
Author: | Thomas Tauber-Marshall | |
Committer: | Impala Public Jenkins |
IMPALA-7467: Port ExecQueryFInstances to krpc This patch ports the ExecQueryFInstances rpc to use KRPC. The parameters for this call contain a huge number of Thrift structs (eg. everything related to TPlanNode and TExpr), so to avoid converting all of these to protobuf and the resulting effect that would have on the FE and catalog, this patch stores most of the parameters in a sidecar (in particular the TQueryCtx, TPlanFragmentCtx's, and TPlanFragmentInstanceCtx's). Testing: - Passed a full exhaustive run on the minicluster. Set up a ten node cluster with tpch 500: - Ran perf tests: 3 iterations per tpch query, 4 concurrent streams, no perf change. - Ran the stress test for 1000 queries, passed. Change-Id: Id3f1c6099109bd8e5361188005a7d0e892147570 Reviewed-on: http://gerrit.cloudera.org:8080/13583 Reviewed-by: Thomas Tauber-Marshall <tmarshall@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | dfb9e16 | |
---|---|---|
Author: | Lars Volker | |
Committer: | Impala Public Jenkins |
IMPALA-7006: Add KRPC folders from kudu@334ecafd cp -a ~/checkout/kudu/src/kudu/{rpc,util,security} be/src/kudu/ Change-Id: I232db2b4ccf5df9aca87b21dea31bfb2735d1ab7 Reviewed-on: http://gerrit.cloudera.org:8080/10757 Reviewed-by: Lars Volker <lv@cloudera.com> Tested-by: Lars Volker <lv@cloudera.com>
Commit: | 0c2d3c7 | |
---|---|---|
Author: | Lars Volker | |
Committer: | Impala Public Jenkins |
IMPALA-7006: Remove KRPC folders Change-Id: Ic677484c27ed18b105da0a6b0901df4eb9f248e6 Reviewed-on: http://gerrit.cloudera.org:8080/10756 Reviewed-by: Lars Volker <lv@cloudera.com> Tested-by: Lars Volker <lv@cloudera.com>
Commit: | adde66b | |
---|---|---|
Author: | Andrew Sherman | |
Committer: | Impala Public Jenkins |
IMPALA-7985: Port RemoteShutdown() to KRPC. The :shutdown command is used to shutdown a remote server. The common case is that a user specifies the impalad to shutdown by specifying a host e.g. :shutdown('host100'). If a user has more than one impalad on a remote host then the form :shutdown('<host>:<port>') can be used to specify the port by which the impalad can be contacted. Prior to IMPALA-7985 this port was the backend port, e.g. :shutdown('host100:22000'). With IMPALA-7985 the port to use is the KRPC port, e.g. :shutdown('host100:27000'). Shutdown is implemented by making an rpc call to the target impalad. This changes the implementation of this call to use KRPC. To aid the user in finding the KRPC port, the KRPC address is added to the /backends section of the debug web page. We attempt to detect the case where :shutdown is pointed at a thrift port (like the backend port) and print an informative message. Documentation of this change will be done in IMPALA-8098. Further improvements to DoRpcWithRetry() will be done in IMPALA-8143. For discussion of why it was chosen to implement this change in an incompatible way, see comments in https://issues.apache.org/jira/browse/IMPALA-7985. TESTING Ran all end-to-end tests. Enhance the test for /backends in test_web_pages.py. In test_restart_services.py add a call to the old backend port to the test. Some expected error messages were changed in line with what KRPC returns. Change-Id: I4fd00ee4e638f5e71e27893162fd65501ef9e74e Reviewed-on: http://gerrit.cloudera.org:8080/12260 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | b1e4957 | |
---|---|---|
Author: | Thomas Tauber-Marshall | |
Committer: | Impala Public Jenkins |
IMPALA-4555: Make QueryState's status reporting more robust QueryState periodically collects runtime profiles from all of its fragment instances and sends them to the coordinator. Previously, each time this happens, if the rpc fails, QueryState will retry twice after a configurable timeout and then cancel the fragment instances under the assumption that the coordinator no longer exists. We've found in real clusters that this logic is too sensitive to failed rpcs and can result in fragment instances being cancelled even in cases where the coordinator is still running. This patch makes a few improvements to this logic: - When a report fails to send, instead of retrying the same report quickly (after waiting report_status_retry_interval_ms), we wait the regular reporting interval (status_report_interval_ms), regenerate any stale portions of the report, and then retry. - A new flag, --status_report_max_retries, is introduced, which controls the number of failed reports that are allowed before the query is cancelled. --report_status_retry_interval_ms is removed. - Backoff is used for repeated failed attempts, such that for a period between retries of 't', on try 'n' the actual timeout will be t * n. Testing: - Added a test which results in a large number of failed intermediate status reports but still succeeds. Change-Id: Ib6007013fc2c9e8eeba11b752ee58fb3038da971 Reviewed-on: http://gerrit.cloudera.org:8080/12049 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | e4cff7d | |
---|---|---|
Author: | Andrew Sherman | |
Committer: | Impala Public Jenkins |
IMPALA-7468: Port CancelQueryFInstances() to KRPC. When the Coordinator needs to cancel a query (for example because a user has hit Control-C), it does this by sending a CancelQueryFInstances message to each fragment instance. This change switches this code to use KRPC. Add new protobuf definitions for the messages, and remove the old thrift definitions. Move the server-side implementation of Cancel() from ImpalaInternalService to ControlService. Rework the scheduler so that the FInstanceExecParams always contains the KRPC address of the fragment executors, this address can then be used if a query is to be cancelled. For now keep the KRPC calls to CancelQueryFInstances() as synchronous. While moving the client-side code, remove the fault injection code that was inserted with FAULT_INJECTION_SEND_RPC_EXCEPTION and FAULT_INJECTION_RECV_RPC_EXCEPTION (triggered by running impalad with --fault_injection_rpc_exception_type=1) as this tickles code in client-cache.h which is now not used. TESTING: Ran all end-to-end tests. No new tests as test_cancellation.py provides good coverage. Checked in debugger that DebugAction style fault injection (triggered from test_cancellation.py) was working correctly. Change-Id: I625030c3f1068061aa029e6e242f016cadd84969 Reviewed-on: http://gerrit.cloudera.org:8080/12142 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 9410382 | |
---|---|---|
Author: | Michael Ho | |
Committer: | Impala Public Jenkins |
IMPALA-4063: Merge report of query fragment instances per executor Previously, each fragment instance executing on an executor will independently report its status to the coordinator periodically. This creates a huge amount of RPCs to the coordinator under highly concurrent workloads, causing lock contention in the coordinator's backend states when multiple fragment instances send them at the same time. In addition, due to the lack of coordination between query fragment instances, a query may end without collecting the profiles from all fragment instances when one of them hits an error before another fragment instance manages to finish Prepare(), leading to missing profiles for certain fragment instances. This change fixes the problem above by making a thread per QueryState (started by QueryExecMgr) to be responsible for periodically reporting the status and profiles of all fragment instances of a query running on a backend. As part of this refactoring, each query fragment instance will not report their errors individually. Instead, there is a cumulative status maintained per QueryState. It's set to the error status of the first fragment instance which hits an error or any general error (e.g. failure to start a thread) when starting fragment instances. With this change, the status reporting threads are also removed. Testing done: exhaustive tests This patch is based on a patch by Sailesh Mukil Change-Id: I5f95e026ba05631f33f48ce32da6db39c6f421fa Reviewed-on: http://gerrit.cloudera.org:8080/11615 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 5391100 | |
---|---|---|
Author: | Michael Ho | |
Committer: | Impala Public Jenkins |
IMPALA-7213, IMPALA-7241: Port ReportExecStatus() RPC to use KRPC This change converts ReportExecStatus() RPC from thrift based RPC to KRPC. This is done in part of the preparation for fixing IMPALA-2990 as we can take advantage of TCP connection multiplexing in KRPC to avoid overwhelming the coordinator with too many connections by reducing the number of TCP connection to one for each executor. This patch also introduces a new service pool for all query execution control related RPCs in the future so that control commands from coordinators aren't blocked by long-running DataStream services' RPCs. To avoid unnecessary delays due to sharing the network connections between DataStream service and Control service, this change added the service name as part of the user credentials for the ConnectionId so each service will use a separate connection. The majority of this patch is mechanical conversion of some Thrift structures used in ReportExecStatus() RPC to Protobuf. Note that the runtime profile is still retained as a Thrift structure as Impala clients will still fetch query profiles using Thrift RPCs. This also avoids duplicating the serialization implementation in both Thrift and Protobuf for the runtime profile. The Thrift runtime profiles are serialized and sent as a sidecar in ReportExecStatus() RPC. This patch also fixes IMPALA-7241 which may lead to duplicated dml stats being applied. The fix is by adding a monotonically increasing version number for fragment instances' reports. The coordinator will ignore any report smaller than or equal to the version in the last report. Testing done: 1. Exhaustive build. 2. Added some targeted test cases for profile serialization failure and RPC retries/timeout. Change-Id: I7638583b433dcac066b87198e448743d90415ebe Reviewed-on: http://gerrit.cloudera.org:8080/10855 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 5c541b9 | |
---|---|---|
Author: | Michael Ho | |
Committer: | Impala Public Jenkins |
Add missing authorization in KRPC In 2.12.0, Impala adopted Kudu RPC library for certain backened services (TransmitData(), EndDataStream()). While the implementation uses Kerberos for authenticating users connecting to the backend services, there is no authorization implemented. This is a regression from the Thrift based implementation because it registered a SASL callback (SaslAuthorizeInternal) to be invoked during the connection negotiation. With this regression, an unauthorized but authenticated user may invoke RPC calls to Impala backend services. This change fixes the issue above by overriding the default authorization method for the DataStreamService. The authorization method will only let authenticated principal which matches FLAGS_principal / FLAGS_be_principal to access the service. Also added a new startup flag --krb5_ccname to allow users to customize the locations of the Kerberos credentials cache. Testing done: 1. Added a new test case in rpc-mgr-kerberized-test.cc to confirm an unauthorized user is not allowed to access the service. 2. Ran some queries in a Kerberos enabled cluster to make sure there is no error. 3. Exhaustive builds. Thanks to Todd Lipcon for pointing out the problem and his guidance on the fix. Change-Id: I2f82dee5e721f2ed23e75fd91abbc6ab7addd4c5 Reviewed-on: http://gerrit.cloudera.org:8080/11331 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Commit: | 39870d4 | |
---|---|---|
Author: | Lars Volker | |
Committer: | Lars Volker |
IMPALA-7006: Remove KRPC folders Change-Id: Ic677484c27ed18b105da0a6b0901df4eb9f248e6 Reviewed-on: http://gerrit.cloudera.org:8080/10756 Reviewed-by: Lars Volker <lv@cloudera.com> Tested-by: Lars Volker <lv@cloudera.com>
Commit: | fcf190c | |
---|---|---|
Author: | Lars Volker | |
Committer: | Lars Volker |
IMPALA-7006: Add KRPC folders from kudu@334ecafd cp -a ~/checkout/kudu/src/kudu/{rpc,util,security} be/src/kudu/ Change-Id: I232db2b4ccf5df9aca87b21dea31bfb2735d1ab7 Reviewed-on: http://gerrit.cloudera.org:8080/10757 Reviewed-by: Lars Volker <lv@cloudera.com> Tested-by: Lars Volker <lv@cloudera.com>
Commit: | 3855cb0 | |
---|---|---|
Author: | Michael Ho | |
Committer: | Impala Public Jenkins |
IMPALA-6685: Improve profiles in KrpcDataStreamRecvr and KrpcDataStreamSender This change implements a couple of improvements to the profiles of KrpcDataStreamRecvr and KrpcDataStreamSender: - track pending number of deferred row batches over time in KrpcDataStreamRecvr - track the number of bytes dequeued over time in KrpcDataStreamRecvr - track the total time deferred RPCs queues are not empty - track the number of bytes sent from KrpcDataStreamSender over time - track the total amount of time spent in KrpcDataStreamSender, including time spent waiting for RPC completion. Sample profile of an Exchange node instance: EXCHANGE_NODE (id=21):(Total: 2s284ms, non-child: 64.926ms, % non-child: 2.84%) - ConvertRowBatchTime: 44.380ms - PeakMemoryUsage: 124.04 KB (127021) - RowsReturned: 287.51K (287514) - RowsReturnedRate: 125.88 K/sec Buffer pool: - AllocTime: 1.109ms - CumulativeAllocationBytes: 10.96 MB (11493376) - CumulativeAllocations: 562 (562) - PeakReservation: 112.00 KB (114688) - PeakUnpinnedBytes: 0 - PeakUsedReservation: 112.00 KB (114688) - ReadIoBytes: 0 - ReadIoOps: 0 (0) - ReadIoWaitTime: 0.000ns - WriteIoBytes: 0 - WriteIoOps: 0 (0) - WriteIoWaitTime: 0.000ns Dequeue: BytesDequeued(500.000ms): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 700.00 KB, 2.00 MB, 3.49 MB, 4.39 MB, 5.86 MB, 6.85 MB - FirstBatchWaitTime: 0.000ns - TotalBytesDequeued: 6.85 MB (7187850) - TotalGetBatchTime: 2s237ms - DataWaitTime: 2s219ms Enqueue: BytesReceived(500.000ms): 0, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 328.73 KB, 963.79 KB, 1.64 MB, 2.09 MB, 2.76 MB, 3.23 MB DeferredQueueSize(500.000ms): 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0 - DispatchTime: (Avg: 108.593us ; Min: 30.525us ; Max: 1.524ms ; Number of samples: 281) - DeserializeRowBatchTime: 8.395ms - TotalBatchesEnqueued: 281 (281) - TotalBatchesReceived: 281 (281) - TotalBytesReceived: 3.23 MB (3387144) - TotalEarlySenders: 0 (0) - TotalEosReceived: 1 (1) - TotalHasDeferredRPCsTime: 15s446ms - TotalRPCsDeferred: 38 (38) Sample sender's profile: KrpcDataStreamSender (dst_id=21):(Total: 17s923ms, non-child: 604.494ms, % non-child: 3.37%) BytesSent(500.000ms): 0, 0, 0, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 46.54 KB, 46.54 KB, 46.54 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 974.44 KB, 2.82 MB, 4.93 MB, 6.27 MB, 8.28 MB, 9.69 MB - EosSent: 3 (3) - NetworkThroughput: 4.61 MB/sec - PeakMemoryUsage: 22.57 KB (23112) - RowsSent: 287.51K (287514) - RpcFailure: 0 (0) - RpcRetry: 0 (0) - SerializeBatchTime: 329.162ms - TotalBytesSent: 9.69 MB (10161432) - UncompressedRowBatchSize: 20.56 MB (21563550) Change-Id: I8ba405921b3df920c1e85b940ce9c8d02fc647cd Reviewed-on: http://gerrit.cloudera.org:8080/9690 Reviewed-by: Michael Ho <kwho@cloudera.com> Tested-by: Impala Public Jenkins
Commit: | 421af4e | |
---|---|---|
Author: | Michael Ho | |
Committer: | Impala Public Jenkins |
IMPALA-6685: Improve profiles in KrpcDataStreamRecvr and KrpcDataStreamSender This change implements a couple of improvements to the profiles of KrpcDataStreamRecvr and KrpcDataStreamSender: - track pending number of deferred row batches over time in KrpcDataStreamRecvr - track the number of bytes dequeued over time in KrpcDataStreamRecvr - track the total time deferred RPCs queues are not empty - track the number of bytes sent from KrpcDataStreamSender over time - track the total amount of time spent in KrpcDataStreamSender, including time spent waiting for RPC completion. Sample profile of an Exchange node instance: EXCHANGE_NODE (id=21):(Total: 2s284ms, non-child: 64.926ms, % non-child: 2.84%) - ConvertRowBatchTime: 44.380ms - PeakMemoryUsage: 124.04 KB (127021) - RowsReturned: 287.51K (287514) - RowsReturnedRate: 125.88 K/sec Buffer pool: - AllocTime: 1.109ms - CumulativeAllocationBytes: 10.96 MB (11493376) - CumulativeAllocations: 562 (562) - PeakReservation: 112.00 KB (114688) - PeakUnpinnedBytes: 0 - PeakUsedReservation: 112.00 KB (114688) - ReadIoBytes: 0 - ReadIoOps: 0 (0) - ReadIoWaitTime: 0.000ns - WriteIoBytes: 0 - WriteIoOps: 0 (0) - WriteIoWaitTime: 0.000ns Dequeue: BytesDequeued(500.000ms): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 700.00 KB, 2.00 MB, 3.49 MB, 4.39 MB, 5.86 MB, 6.85 MB - FirstBatchWaitTime: 0.000ns - TotalBytesDequeued: 6.85 MB (7187850) - TotalGetBatchTime: 2s237ms - DataWaitTime: 2s219ms Enqueue: BytesReceived(500.000ms): 0, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 328.73 KB, 963.79 KB, 1.64 MB, 2.09 MB, 2.76 MB, 3.23 MB DeferredQueueSize(500.000ms): 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0 - DispatchTime: (Avg: 108.593us ; Min: 30.525us ; Max: 1.524ms ; Number of samples: 281) - DeserializeRowBatchTime: 8.395ms - TotalBatchesEnqueued: 281 (281) - TotalBatchesReceived: 281 (281) - TotalBytesReceived: 3.23 MB (3387144) - TotalEarlySenders: 0 (0) - TotalEosReceived: 1 (1) - TotalHasDeferredRPCsTime: 15s446ms - TotalRPCsDeferred: 38 (38) Sample sender's profile: KrpcDataStreamSender (dst_id=21):(Total: 17s923ms, non-child: 604.494ms, % non-child: 3.37%) BytesSent(500.000ms): 0, 0, 0, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 46.54 KB, 46.54 KB, 46.54 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 974.44 KB, 2.82 MB, 4.93 MB, 6.27 MB, 8.28 MB, 9.69 MB - EosSent: 3 (3) - NetworkThroughput: 4.61 MB/sec - PeakMemoryUsage: 22.57 KB (23112) - RowsSent: 287.51K (287514) - RpcFailure: 0 (0) - RpcRetry: 0 (0) - SerializeBatchTime: 329.162ms - TotalBytesSent: 9.69 MB (10161432) - UncompressedRowBatchSize: 20.56 MB (21563550) Change-Id: I8ba405921b3df920c1e85b940ce9c8d02fc647cd Reviewed-on: http://gerrit.cloudera.org:8080/9690 Reviewed-by: Michael Ho <kwho@cloudera.com> Tested-by: Impala Public Jenkins
Commit: | 258938a | |
---|---|---|
Author: | Sailesh Mukil | |
Committer: | Impala Public Jenkins |
KUDU-2301: (Part-1) Add instrumentation on a per connection level This patch returns the OutboundTransfer queue size on a per connection level and makes them accessible via the DumpRunningRpcs() call. A test is added in rpc-test to ensure that this metric works as expected. A future patch will add more metrics. Change-Id: Iae1a5fe0066adf644a9cac41ad6696e1bbf00465 Reviewed-on: http://gerrit.cloudera.org:8080/9343 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <todd@apache.org> Reviewed-on: http://gerrit.cloudera.org:8080/9383 Reviewed-by: Sailesh Mukil <sailesh@cloudera.com> Reviewed-by: Michael Ho <kwho@cloudera.com> Tested-by: Impala Public Jenkins
Commit: | cf5ef7f | |
---|---|---|
Author: | Sailesh Mukil | |
Committer: | Impala Public Jenkins |
KUDU-2301: (Part-1) Add instrumentation on a per connection level This patch returns the OutboundTransfer queue size on a per connection level and makes them accessible via the DumpRunningRpcs() call. A test is added in rpc-test to ensure that this metric works as expected. A future patch will add more metrics. Change-Id: Iae1a5fe0066adf644a9cac41ad6696e1bbf00465 Reviewed-on: http://gerrit.cloudera.org:8080/9343 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <todd@apache.org> Reviewed-on: http://gerrit.cloudera.org:8080/9383 Reviewed-by: Sailesh Mukil <sailesh@cloudera.com> Reviewed-by: Michael Ho <kwho@cloudera.com> Tested-by: Impala Public Jenkins
Commit: | 8b5f133 | |
---|---|---|
Author: | Lars Volker | |
Committer: | Impala Public Jenkins |
IMPALA-6193: Track memory of incoming data streams This change adds memory tracking to incoming transmit data RPCs when using KRPC. We track memory against a global tracker called "Data Stream Service" until it is handed over to the stream manager. There we track it in a global tracker called "Data Stream Queued RPC Calls" until a receiver registers and takes over the early sender RPCs. Inside the receiver, memory for deferred RPCs is tracked against the fragment instance's memtracker until we unpack the batches and add them to the row batch queue. The DCHECK in MemTracker::Close() covers that all memory consumed by a tracker gets release eventually. In addition to that, this change adds a custom cluster test that makes sure that queued memory gets tracked by inspecting the peak consumption of the new memtrackers. Change-Id: I2df1204d2483313a8a18e5e3be6cec9e402614c4 Reviewed-on: http://gerrit.cloudera.org:8080/8914 Reviewed-by: Lars Volker <lv@cloudera.com> Tested-by: Impala Public Jenkins
Commit: | 3bfda33 | |
---|---|---|
Author: | Lars Volker | |
Committer: | Impala Public Jenkins |
IMPALA-6193: Track memory of incoming data streams This change adds memory tracking to incoming transmit data RPCs when using KRPC. We track memory against a global tracker called "Data Stream Service" until it is handed over to the stream manager. There we track it in a global tracker called "Data Stream Queued RPC Calls" until a receiver registers and takes over the early sender RPCs. Inside the receiver, memory for deferred RPCs is tracked against the fragment instance's memtracker until we unpack the batches and add them to the row batch queue. The DCHECK in MemTracker::Close() covers that all memory consumed by a tracker gets release eventually. In addition to that, this change adds a custom cluster test that makes sure that queued memory gets tracked by inspecting the peak consumption of the new memtrackers. Change-Id: I2df1204d2483313a8a18e5e3be6cec9e402614c4 Reviewed-on: http://gerrit.cloudera.org:8080/8914 Reviewed-by: Lars Volker <lv@cloudera.com> Tested-by: Impala Public Jenkins
Commit: | b4ea57a | |
---|---|---|
Author: | Michael Ho | |
Committer: | Impala Public Jenkins |
IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents avoids the possibility that a thread is stuck in the RPC code for extended amount of time without checking for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, the number of service threads equals the number of logical cores. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a queue for deferred processing. The stashed RPC requests will not be responded to until they are processed so as to exert back pressure to the senders. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING ------- * Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true. TO DO ----- * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Reviewed-on: http://gerrit.cloudera.org:8080/8023 Reviewed-by: Michael Ho <kwho@cloudera.com> Tested-by: Impala Public Jenkins
Commit: | dd4c6be | |
---|---|---|
Author: | Michael Ho | |
Committer: | Impala Public Jenkins |
IMPALA-4670: Introduces RpcMgr class This patch introduces a new class, RpcMgr which is the abstraction layer around KRPC core mechanics. It provides an interface RegisterService() for various services to register themselves. Kudu RPC is invoked via an auto-generated interface called proxy. This change implements an inline wrapper for KRPC client to obtain a proxy for a particular service exported by remote server. Last but not least, the RpcMgr will start all registered services if FLAGS_use_krpc is true. This patch hasn't yet added any service except for some test services in rpc-mgr-test. This patch is based on an abandoned patch by Henry Robinson. Testing done: a new backend test is added to exercise the code and demonstrate the way to interact with KRPC framework. Change-Id: I8adb10ae375d7bf945394c38a520f12d29cf7b46 Reviewed-on: http://gerrit.cloudera.org:8080/7901 Reviewed-by: Michael Ho <kwho@cloudera.com> Tested-by: Impala Public Jenkins
Commit: | c1c4815 | |
---|---|---|
Author: | Michael Ho | |
Committer: | Impala Public Jenkins |
KUDU-2065: Support cancellation for outbound RPC call This change implements a new interface RpcController::Cancel() which takes a RpcController as argument and cancels any pending OutboundCall associated with it. RpcController::Cancel() queues a cancellation task scheduled on the reactor thread for that outbound call. Once the task is run, it will cancel the outbound call right away if the RPC hasn't started sending yet or if it has already sent the request and waiting for a response. If cancellation happens when the RPC request is being sent, the RPC will be cancelled only after the RPC has finished sending the request. If the RPC is finished, the cancellation will be a no-op. Change-Id: Iaf53c5b113de10d573bd32fb9b2293572e806fbf Reviewed-on: http://gerrit.cloudera.org:8080/7455 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <todd@apache.org> Reviewed-on: http://gerrit.cloudera.org:8080/7743 Reviewed-by: Sailesh Mukil <sailesh@cloudera.com> Tested-by: Impala Public Jenkins
Commit: | c7db60a | |
---|---|---|
Author: | Henry Robinson | |
Committer: | Impala Public Jenkins |
IMPALA-4669: [KRPC] Import RPC library from kudu@314c9d8 Change-Id: I06ab5b56312e482a27fa484414c338438ad6972c Reviewed-on: http://gerrit.cloudera.org:8080/5718 Reviewed-by: Michael Ho <kwho@cloudera.com> Tested-by: Impala Public Jenkins
Commit: | 84b8155 | |
---|---|---|
Author: | Henry Robinson | |
Committer: | Henry Robinson |
IMPALA-4669: [SECURITY] Import Kudu security library from kudu@314c9d8 The security library provides Kerberos and TLS facilities to the rpc library. Change-Id: I76daeead00f672aa468f5ab6de4d70eac2078cb2 Reviewed-on: http://gerrit.cloudera.org:8080/5716 Reviewed-by: Henry Robinson <henry@cloudera.com> Tested-by: Henry Robinson <henry@cloudera.com>
Commit: | d6abb29 | |
---|---|---|
Author: | Henry Robinson | |
Committer: | Impala Public Jenkins |
IMPALA-4669: [KUTIL] Import kudu_util library from kudu@314c9d8 Update LICENSE.txt and rat_exclude_files.txt Change-Id: I6d89384730b60354b5fae2b1472183d2a561d170 Reviewed-on: http://gerrit.cloudera.org:8080/5714 Reviewed-by: Henry Robinson <henry@cloudera.com> Tested-by: Impala Public Jenkins
Commit: | 02f3e3f | |
---|---|---|
Author: | Henry Robinson | |
Committer: | Henry Robinson |
IMPALA-4758: (1/2) Update gutil/ from Kudu@a1bfd7b * Copy gutil from Kudu * Minimal changes to gutil/CMakeLists.txt Change-Id: Ic708a9c4e76ede17af9b06e0a0a8e9ae7d357960 Reviewed-on: http://gerrit.cloudera.org:8080/5687 Reviewed-by: Dan Hecht <dhecht@cloudera.com> Tested-by: Henry Robinson <henry@cloudera.com>
This commit does not contain any .proto
files.
Commit: | e8fe220 | |
---|---|---|
Author: | David Alves | |
Committer: | David Alves |
Merge thirdparty from cdh5-trunk This is the first step towards merging impala-kudu with trunk. These are basically just mechanical changes, pulling from trunk thirparty and just enough other changes to cmake build scripts or impala-config.sh to make it compile. NOTE: This patch is basically half-way between the impala-kudu build, that doesn't yet use the toolchain and the impala trunk build that does. As such this patch doesn't actually build stand-alone and serves merely the purpose of ommitting +/- 650K loc from the merge patch itself. Change-Id: Ic794988dcadee16e687a82745b417605772ff325
Commit: | 81f247b | |
---|---|---|
Author: | Martin Grund | |
Committer: | Martin Grund |
Optional Impala Toolchain This patch allows to optionally enable the new Impala binary toolchain. For now there are now major version differences in the toolchain dependencies and what is currently kept in thirdparty. To enable the toolchain, export the variable IMPALA_TOOLCHAIN to the folder where the binaries are available. In addition this patch moves gutil from the thirdparty directory into the source tree of be/src to allow easy propagation of compiler and linker flags. Furthermore, the thrift-cpp target was added as a dependency to all targets that require the generated thrift sources to be available before the build is started. What is the new toolchain: The goal of the toolchain is to homogenize the build environment and to make sure that Impala is build nearly identical on every platform. To achieve this, we limit the flexibility of using the systems host libraries and rather rely on a set of custom produced binaries including the necessary compiler. Change-Id: If2dac920520e4a18be2a9a75b3184a5bd97a065b Reviewed-on: http://gerrit.cloudera.org:8080/427 Reviewed-by: Adar Dembo <adar@cloudera.com> Tested-by: Internal Jenkins Reviewed-by: Martin Grund <mgrund@cloudera.com>