Group Replication是通过MySQL的插件方式实现的,为不影响MySQL原有事务的处理过程,通过在流程中选择性地加入Group Replication的特殊处理方法,降低对原有代码的影响,并通过回调函数的机制来实现对Group Replication的支持。
在DML操作中,使用函数group_replication_trans_before_dml来检查,确保操作满足MGR的限制条件。
在事务提交前,使用函数group_replication_trans_before_commit将事务相关的Binlog和WriteSet以及全局GTID进行打包,并将打包后的消息同步给所有节点进行验证。
当其他节点接受到消息后,会将消息放入incomingQ队列中,由applier thread读取incomingQ队列中的消息,依次执行event事件的处理过程。每个event会依次进入event handler pipeline执行相应的处理:
MySQL Group Replication在提交事务之前通过XCOM来进行节点间事务的同步,保证事务提交前,多数节点上已经保存此事务相关的binlog event,从而保证节点间事务的一致性。 为降低MySQL与XCOM代码的耦合性,采用消息通知、回调函数等机制有效的对不同模块进行隔离。
MGR早期通讯层采用是的Corosync,后才改为XCom,主要有如下原因:
/*
Transaction lifecycle events observers.
*/
int group_replication_trans_before_dml(Trans_param *param, int& out)
{
DBUG_ENTER("group_replication_trans_before_dml");
out= 0;
//If group replication has not started, then moving along...
if (!plugin_is_group_replication_running())
{
DBUG_RETURN(0);
}
/*
The first check to be made is if the session binlog is active
If it is not active, this query is not relevant for the plugin.
*/
if(!param->trans_ctx_info.binlog_enabled)
{
DBUG_RETURN(0);
}
/*
In runtime, check the global variables that can change.
*/
if( (out+= (param->trans_ctx_info.binlog_format != BINLOG_FORMAT_ROW)) )
{
log_message(MY_ERROR_LEVEL, "Binlog format should be ROW for Group Replication");
DBUG_RETURN(0);
}
if( (out+= (param->trans_ctx_info.binlog_checksum_options !=
binary_log::BINLOG_CHECKSUM_ALG_OFF)) )
{
log_message(MY_ERROR_LEVEL, "binlog_checksum should be NONE for Group Replication");
DBUG_RETURN(0);
}
if ((out+= (param->trans_ctx_info.transaction_write_set_extraction ==
HASH_ALGORITHM_OFF)))
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL,
"A transaction_write_set_extraction algorithm "
"should be selected when running Group Replication");
DBUG_RETURN(0);
/* purecov: end */
}
if (local_member_info->has_enforces_update_everywhere_checks() &&
(out+= (param->trans_ctx_info.tx_isolation == ISO_SERIALIZABLE)))
{
log_message(MY_ERROR_LEVEL, "Transaction isolation level (tx_isolation) "
"is set to SERIALIZABLE, which is not compatible with Group "
"Replication");
DBUG_RETURN(0);
}
/*
Cycle through all involved tables to assess if they all
comply with the plugin runtime requirements. For now:
- The table must be from a transactional engine
- It must contain at least one primary key
- It should not contain ‘ON DELETE/UPDATE CASCADE‘ referential action
*/
for(uint table=0; out == 0 && table < param->number_of_tables; table++)
{
if (param->tables_info[table].db_type != DB_TYPE_INNODB)
{
log_message(MY_ERROR_LEVEL, "Table %s does not use the InnoDB storage "
"engine. This is not compatible with Group "
"Replication",
param->tables_info[table].table_name);
out++;
}
if(param->tables_info[table].number_of_primary_keys == 0)
{
log_message(MY_ERROR_LEVEL, "Table %s does not have any PRIMARY KEY. This is not compatible with Group Replication",
param->tables_info[table].table_name);
out++;
}
if (local_member_info->has_enforces_update_everywhere_checks() &&
param->tables_info[table].has_cascade_foreign_key)
{
log_message(MY_ERROR_LEVEL, "Table %s has a foreign key with"
" ‘CASCADE‘ clause. This is not compatible with Group"
" Replication", param->tables_info[table].table_name);
out++;
}
}
DBUG_RETURN(0);
}
int group_replication_trans_before_commit(Trans_param *param)
{
DBUG_ENTER("group_replication_trans_before_commit");
int error= 0;
const int pre_wait_error= 1;
const int post_wait_error= 2;
DBUG_EXECUTE_IF("group_replication_force_error_on_before_commit_listener",
DBUG_RETURN(1););
DBUG_EXECUTE_IF("group_replication_before_commit_hook_wait",
{
const char act[]= "now wait_for continue_commit";
DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
});
/*
If the originating id belongs to a thread in the plugin, the transaction
was already certified. Channel operations can deadlock against
plugin/applier thread stops so they must remain outside the plugin stop
lock below.
*/
Replication_thread_api channel_interface;
if (GR_APPLIER_CHANNEL == param->rpl_channel_type) {
// If plugin is stopping, there is no point in update the statistics.
bool fail_to_lock= shared_plugin_stop_lock->try_grab_read_lock();
if (!fail_to_lock)
{
if (local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
{
applier_module->get_pipeline_stats_member_collector()
->decrement_transactions_waiting_apply();
applier_module->get_pipeline_stats_member_collector()
->increment_transactions_applied();
}
shared_plugin_stop_lock->release_read_lock();
}
DBUG_RETURN(0);
}
if (GR_RECOVERY_CHANNEL == param->rpl_channel_type) {
DBUG_RETURN(0);
}
shared_plugin_stop_lock->grab_read_lock();
if (is_plugin_waiting_to_set_server_read_mode())
{
log_message(MY_ERROR_LEVEL,
"Transaction cannot be executed while Group Replication is stopping.");
shared_plugin_stop_lock->release_read_lock();
DBUG_RETURN(1);
}
/* If the plugin is not running, before commit should return success. */
if (!plugin_is_group_replication_running())
{
shared_plugin_stop_lock->release_read_lock();
DBUG_RETURN(0);
}
DBUG_ASSERT(applier_module != NULL && recovery_module != NULL);
Group_member_info::Group_member_status member_status=
local_member_info->get_recovery_status();
if (member_status == Group_member_info::MEMBER_IN_RECOVERY)
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL,
"Transaction cannot be executed while Group Replication is recovering."
" Try again when the server is ONLINE.");
shared_plugin_stop_lock->release_read_lock();
DBUG_RETURN(1);
/* purecov: end */
}
if (member_status == Group_member_info::MEMBER_ERROR)
{
log_message(MY_ERROR_LEVEL,
"Transaction cannot be executed while Group Replication is on ERROR state."
" Check for errors and restart the plugin");
shared_plugin_stop_lock->release_read_lock();
DBUG_RETURN(1);
}
if (member_status == Group_member_info::MEMBER_OFFLINE)
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL,
"Transaction cannot be executed while Group Replication is OFFLINE."
" Check for errors and restart the plugin");
shared_plugin_stop_lock->release_read_lock();
DBUG_RETURN(1);
/* purecov: end */
}
// Transaction information.
const ulong transaction_size_limit= get_transaction_size_limit();
my_off_t transaction_size= 0;
const bool is_gtid_specified= param->gtid_info.type == GTID_GROUP;
Gtid gtid= { param->gtid_info.sidno, param->gtid_info.gno };
if (!is_gtid_specified)
{
// Dummy values that will be replaced after certification.
gtid.sidno= 1;
gtid.gno= 1;
}
const Gtid_specification gtid_specification= { GTID_GROUP, gtid };
Gtid_log_event *gle= NULL;
Transaction_context_log_event *tcle= NULL;
// group replication cache.
IO_CACHE *cache= NULL;
// Todo optimize for memory (IO-cache‘s buf to start with, if not enough then trans mem-root)
// to avoid New message create/delete and/or its implicit MessageBuffer.
Transaction_Message transaction_msg;
enum enum_gcs_error send_error= GCS_OK;
// Binlog cache.
bool is_dml= true;
bool may_have_sbr_stmts= !is_dml;
IO_CACHE *cache_log= NULL;
my_off_t cache_log_position= 0;
bool reinit_cache_log_required= false;
const my_off_t trx_cache_log_position= my_b_tell(param->trx_cache_log);
const my_off_t stmt_cache_log_position= my_b_tell(param->stmt_cache_log);
if (trx_cache_log_position > 0 && stmt_cache_log_position == 0)
{
cache_log= param->trx_cache_log;
cache_log_position= trx_cache_log_position;
}
else if (trx_cache_log_position == 0 && stmt_cache_log_position > 0)
{
cache_log= param->stmt_cache_log;
cache_log_position= stmt_cache_log_position;
is_dml= false;
may_have_sbr_stmts= true;
}
else
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL, "We can only use one cache type at a "
"time on session %u", param->thread_id);
shared_plugin_stop_lock->release_read_lock();
DBUG_RETURN(1);
/* purecov: end */
}
applier_module->get_pipeline_stats_member_collector()
->increment_transactions_local();
DBUG_ASSERT(cache_log->type == WRITE_CACHE);
DBUG_PRINT("cache_log", ("thread_id: %u, trx_cache_log_position: %llu,"
" stmt_cache_log_position: %llu",
param->thread_id, trx_cache_log_position,
stmt_cache_log_position));
/*
Open group replication cache.
Reuse the same cache on each session for improved performance.
*/
cache= observer_trans_get_io_cache(param->thread_id,
param->cache_log_max_size);
if (cache == NULL)
{
/* purecov: begin inspected */
error= pre_wait_error;
goto err;
/* purecov: end */
}
// Reinit binlog cache to read.
if (reinit_cache(cache_log, READ_CACHE, 0))
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL, "Failed to reinit binlog cache log for read "
"on session %u", param->thread_id);
error= pre_wait_error;
goto err;
/* purecov: end */
}
/*
After this, cache_log should be reinit to old saved value when we
are going out of the function scope.
*/
reinit_cache_log_required= true;
// Create transaction context.
tcle= new Transaction_context_log_event(param->server_uuid,
is_dml,
param->thread_id,
is_gtid_specified);
if (!tcle->is_valid())
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL,
"Failed to create the context of the current "
"transaction on session %u", param->thread_id);
error= pre_wait_error;
goto err;
/* purecov: end */
}
if (is_dml)
{
Transaction_write_set* write_set= get_transaction_write_set(param->thread_id);
/*
When GTID is specified we may have empty transactions, that is,
a transaction may have not write set at all because it didn‘t
change any data, it will just persist that GTID as applied.
*/
if ((write_set == NULL) && (!is_gtid_specified))
{
log_message(MY_ERROR_LEVEL, "Failed to extract the set of items written "
"during the execution of the current "
"transaction on session %u", param->thread_id);
error= pre_wait_error;
goto err;
}
if (write_set != NULL)
{
if (add_write_set(tcle, write_set))
{
/* purecov: begin inspected */
cleanup_transaction_write_set(write_set);
log_message(MY_ERROR_LEVEL, "Failed to gather the set of items written "
"during the execution of the current "
"transaction on session %u", param->thread_id);
error= pre_wait_error;
goto err;
/* purecov: end */
}
cleanup_transaction_write_set(write_set);
DBUG_ASSERT(is_gtid_specified || (tcle->get_write_set()->size() > 0));
}
else
{
/*
For empty transactions we should set the GTID may_have_sbr_stmts. See
comment at binlog_cache_data::may_have_sbr_stmts().
*/
may_have_sbr_stmts= true;
}
}
// Write transaction context to group replication cache.
tcle->write(cache);
// Write Gtid log event to group replication cache.
gle= new Gtid_log_event(param->server_id, is_dml, 0, 1,
may_have_sbr_stmts,
gtid_specification);
gle->write(cache);
transaction_size= cache_log_position + my_b_tell(cache);
if (is_dml && transaction_size_limit &&
transaction_size > transaction_size_limit)
{
log_message(MY_ERROR_LEVEL, "Error on session %u. "
"Transaction of size %llu exceeds specified limit %lu. "
"To increase the limit please adjust group_replication_transaction_size_limit option.",
param->thread_id, transaction_size,
transaction_size_limit);
error= pre_wait_error;
goto err;
}
// Reinit group replication cache to read.
if (reinit_cache(cache, READ_CACHE, 0))
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL, "Error while re-initializing an internal "
"cache, for read operations, on session %u",
param->thread_id);
error= pre_wait_error;
goto err;
/* purecov: end */
}
// Copy group replication cache to buffer.
if (transaction_msg.append_cache(cache))
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL, "Error while appending data to an internal "
"cache on session %u", param->thread_id);
error= pre_wait_error;
goto err;
/* purecov: end */
}
// Copy binlog cache content to buffer.
if (transaction_msg.append_cache(cache_log))
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL, "Error while writing binary log cache on "
"session %u", param->thread_id);
error= pre_wait_error;
goto err;
/* purecov: end */
}
DBUG_ASSERT(certification_latch != NULL);
if (certification_latch->registerTicket(param->thread_id))
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL, "Unable to register for getting notifications "
"regarding the outcome of the transaction on "
"session %u", param->thread_id);
error= pre_wait_error;
goto err;
/* purecov: end */
}
#ifndef DBUG_OFF
DBUG_EXECUTE_IF("test_basic_CRUD_operations_sql_service_interface",
{
DBUG_SET("-d,test_basic_CRUD_operations_sql_service_interface");
DBUG_ASSERT(!sql_command_check());
};);
DBUG_EXECUTE_IF("group_replication_before_message_broadcast",
{
const char act[]= "now wait_for waiting";
DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
});
#endif
/*
Check if member needs to throttle its transactions to avoid
cause starvation on the group.
*/
applier_module->get_flow_control_module()->do_wait();
//Broadcast the Transaction Message
send_error= gcs_module->send_message(transaction_msg);
if (send_error == GCS_MESSAGE_TOO_BIG)
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL, "Error broadcasting transaction to the group "
"on session %u. Message is too big.",
param->thread_id);
error= pre_wait_error;
goto err;
/* purecov: end */
}
else if (send_error == GCS_NOK)
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL, "Error while broadcasting the transaction to "
"the group on session %u", param->thread_id);
error= pre_wait_error;
goto err;
/* purecov: end */
}
shared_plugin_stop_lock->release_read_lock();
DBUG_ASSERT(certification_latch != NULL);
if (certification_latch->waitTicket(param->thread_id))
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL, "Error while waiting for conflict detection "
"procedure to finish on session %u",
param->thread_id);
error= post_wait_error;
goto err;
/* purecov: end */
}
err:
// Reinit binlog cache to write (revert what we did).
if (reinit_cache_log_required &&
reinit_cache(cache_log, WRITE_CACHE, cache_log_position))
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL, "Error while re-initializing an internal "
"cache, for write operations, on session %u",
param->thread_id);
/* purecov: end */
}
observer_trans_put_io_cache(cache);
delete gle;
delete tcle;
if (error)
{
if (error == pre_wait_error)
shared_plugin_stop_lock->release_read_lock();
DBUG_ASSERT(certification_latch != NULL);
// Release and remove certification latch ticket.
certification_latch->releaseTicket(param->thread_id);
certification_latch->waitTicket(param->thread_id);
}
DBUG_EXECUTE_IF("group_replication_after_before_commit_hook",
{
const char act[]= "now wait_for signal.commit_continue";
DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
});
DBUG_RETURN(error);
}
int
Certification_handler::handle_event(Pipeline_event *pevent, Continuation *cont)
{
DBUG_ENTER("Certification_handler::handle_event");
Log_event_type ev_type= pevent->get_event_type();
switch (ev_type)
{
case binary_log::TRANSACTION_CONTEXT_EVENT:
DBUG_RETURN(handle_transaction_context(pevent, cont));
case binary_log::GTID_LOG_EVENT:
DBUG_RETURN(handle_transaction_id(pevent, cont));
case binary_log::VIEW_CHANGE_EVENT:
DBUG_RETURN(extract_certification_info(pevent, cont));
default:
next(pevent, cont);
DBUG_RETURN(0);
}
}
/**
This method handles transaction context events by storing them
so they can be used on next handler.
@param[in] pevent the event to be injected
@param[in] cont the object used to wait
@return the operation status
@retval 0 OK
@retval !=0 Error
*/
int handle_transaction_context(Pipeline_event *pevent, Continuation *cont);
/**
This methods handles transaction identifier events, it does two tasks:
1. Using transaction context previously processed and stored,
validate that this transaction does not conflict with any other;
2. If the transaction does not conflict and it is allowed to commit,
it does inform the server of that decision and does update the
transaction identifier if needed.
@param[in] pevent the event to be injected
@param[in] cont the object used to wait
@return the operation status
@retval 0 OK
@retval !=0 Error
*/
int handle_transaction_id(Pipeline_event *pevent, Continuation *cont);
/*
This method extracts the certification db and the sequence number from
the certifier injecting them in a View change event to be sent to a possible
joiner.
*/
int extract_certification_info(Pipeline_event *pevent, Continuation *cont);
MySQL · 引擎特性 · Group Replication内核解析
MySQL · 引擎特性 · Group Replication内核解析之二
原文:https://www.cnblogs.com/gaogao67/p/15098202.html