首页 > 数据库技术 > 详细

MySQL MGR--数据同步原理

时间:2021-08-04 16:00:41      阅读:25      评论:0      收藏:0      [点我收藏+]

MGR同步原理

Group Replication是通过MySQL的插件方式实现的,为不影响MySQL原有事务的处理过程,通过在流程中选择性地加入Group Replication的特殊处理方法,降低对原有代码的影响,并通过回调函数的机制来实现对Group Replication的支持。

在DML操作中,使用函数group_replication_trans_before_dml来检查,确保操作满足MGR的限制条件。

在事务提交前,使用函数group_replication_trans_before_commit将事务相关的Binlog和WriteSet以及全局GTID进行打包,并将打包后的消息同步给所有节点进行验证。

当其他节点接受到消息后,会将消息放入incomingQ队列中,由applier thread读取incomingQ队列中的消息,依次执行event事件的处理过程。每个event会依次进入event handler pipeline执行相应的处理:

  • apply_data_packet
  • Event_cataloger::handle_event
  • Certification_handler::handle_event
  • Applier_handler::handle_event

MySQL Group Replication在提交事务之前通过XCOM来进行节点间事务的同步,保证事务提交前,多数节点上已经保存此事务相关的binlog event,从而保证节点间事务的一致性。 为降低MySQL与XCOM代码的耦合性,采用消息通知、回调函数等机制有效的对不同模块进行隔离。

MGR早期通讯层采用是的Corosync,后才改为XCom,主要有如下原因:

  • MySQL支持各种平台,包括windows,而corosync不都支持。
  • corosync不支持SSL,而只支持对称加密方式,安全性达不到MySQL的要求。
  • corosync采用UDP,而在云端采用UDP进行组播或多播并不是一个好的解决方案。

函数group_replication_trans_before_dml

/*
  Transaction lifecycle events observers.
*/

int group_replication_trans_before_dml(Trans_param *param, int& out)
{
  DBUG_ENTER("group_replication_trans_before_dml");

  out= 0;

  //If group replication has not started, then moving along...
  if (!plugin_is_group_replication_running())
  {
    DBUG_RETURN(0);
  }

  /*
   The first check to be made is if the session binlog is active
   If it is not active, this query is not relevant for the plugin.
   */
  if(!param->trans_ctx_info.binlog_enabled)
  {
    DBUG_RETURN(0);
  }

  /*
   In runtime, check the global variables that can change.
   */
  if( (out+= (param->trans_ctx_info.binlog_format != BINLOG_FORMAT_ROW)) )
  {
    log_message(MY_ERROR_LEVEL, "Binlog format should be ROW for Group Replication");

    DBUG_RETURN(0);
  }

  if( (out+= (param->trans_ctx_info.binlog_checksum_options !=
                                                   binary_log::BINLOG_CHECKSUM_ALG_OFF)) )
  {
    log_message(MY_ERROR_LEVEL, "binlog_checksum should be NONE for Group Replication");

    DBUG_RETURN(0);
  }

  if ((out+= (param->trans_ctx_info.transaction_write_set_extraction ==
              HASH_ALGORITHM_OFF)))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "A transaction_write_set_extraction algorithm "
                "should be selected when running Group Replication");
    DBUG_RETURN(0);
    /* purecov: end */
  }

  if (local_member_info->has_enforces_update_everywhere_checks() &&
      (out+= (param->trans_ctx_info.tx_isolation == ISO_SERIALIZABLE)))
  {
    log_message(MY_ERROR_LEVEL, "Transaction isolation level (tx_isolation) "
                "is set to SERIALIZABLE, which is not compatible with Group "
                "Replication");
    DBUG_RETURN(0);
  }
  /*
    Cycle through all involved tables to assess if they all
    comply with the plugin runtime requirements. For now:
    - The table must be from a transactional engine
    - It must contain at least one primary key
    - It should not contain ‘ON DELETE/UPDATE CASCADE‘ referential action
   */
  for(uint table=0; out == 0 && table < param->number_of_tables; table++)
  {
    if (param->tables_info[table].db_type != DB_TYPE_INNODB)
    {
      log_message(MY_ERROR_LEVEL, "Table %s does not use the InnoDB storage "
                                  "engine. This is not compatible with Group "
                                  "Replication",
                  param->tables_info[table].table_name);
      out++;
    }

    if(param->tables_info[table].number_of_primary_keys == 0)
    {
      log_message(MY_ERROR_LEVEL, "Table %s does not have any PRIMARY KEY. This is not compatible with Group Replication",
                  param->tables_info[table].table_name);
      out++;
    }
    if (local_member_info->has_enforces_update_everywhere_checks() &&
        param->tables_info[table].has_cascade_foreign_key)
    {
      log_message(MY_ERROR_LEVEL, "Table %s has a foreign key with"
                  " ‘CASCADE‘ clause. This is not compatible with Group"
                  " Replication", param->tables_info[table].table_name);
      out++;
    }
  }

  DBUG_RETURN(0);
}

函数group_replication_trans_before_commit

int group_replication_trans_before_commit(Trans_param *param)
{
  DBUG_ENTER("group_replication_trans_before_commit");
  int error= 0;
  const int pre_wait_error= 1;
  const int post_wait_error= 2;

  DBUG_EXECUTE_IF("group_replication_force_error_on_before_commit_listener",
                  DBUG_RETURN(1););

  DBUG_EXECUTE_IF("group_replication_before_commit_hook_wait",
                  {
                    const char act[]= "now wait_for continue_commit";
                    DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
                  });

  /*
    If the originating id belongs to a thread in the plugin, the transaction
    was already certified. Channel operations can deadlock against
    plugin/applier thread stops so they must remain outside the plugin stop
    lock below.
  */
  Replication_thread_api channel_interface;
  if (GR_APPLIER_CHANNEL == param->rpl_channel_type) {

    // If plugin is stopping, there is no point in update the statistics.
    bool fail_to_lock= shared_plugin_stop_lock->try_grab_read_lock();
    if (!fail_to_lock)
    {
      if (local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
      {
        applier_module->get_pipeline_stats_member_collector()
            ->decrement_transactions_waiting_apply();
        applier_module->get_pipeline_stats_member_collector()
            ->increment_transactions_applied();
      }
      shared_plugin_stop_lock->release_read_lock();
    }

    DBUG_RETURN(0);
  }
  if (GR_RECOVERY_CHANNEL == param->rpl_channel_type) {
    DBUG_RETURN(0);
  }

  shared_plugin_stop_lock->grab_read_lock();

  if (is_plugin_waiting_to_set_server_read_mode())
  {
    log_message(MY_ERROR_LEVEL,
                "Transaction cannot be executed while Group Replication is stopping.");
    shared_plugin_stop_lock->release_read_lock();
    DBUG_RETURN(1);
  }

  /* If the plugin is not running, before commit should return success. */
  if (!plugin_is_group_replication_running())
  {
    shared_plugin_stop_lock->release_read_lock();
    DBUG_RETURN(0);
  }

  DBUG_ASSERT(applier_module != NULL && recovery_module != NULL);
  Group_member_info::Group_member_status member_status=
      local_member_info->get_recovery_status();

  if (member_status == Group_member_info::MEMBER_IN_RECOVERY)
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Transaction cannot be executed while Group Replication is recovering."
                " Try again when the server is ONLINE.");
    shared_plugin_stop_lock->release_read_lock();
    DBUG_RETURN(1);
    /* purecov: end */
  }

  if (member_status == Group_member_info::MEMBER_ERROR)
  {
    log_message(MY_ERROR_LEVEL,
                "Transaction cannot be executed while Group Replication is on ERROR state."
                " Check for errors and restart the plugin");
    shared_plugin_stop_lock->release_read_lock();
    DBUG_RETURN(1);
  }

  if (member_status == Group_member_info::MEMBER_OFFLINE)
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Transaction cannot be executed while Group Replication is OFFLINE."
                " Check for errors and restart the plugin");
    shared_plugin_stop_lock->release_read_lock();
    DBUG_RETURN(1);
    /* purecov: end */
  }

  // Transaction information.
  const ulong transaction_size_limit= get_transaction_size_limit();
  my_off_t transaction_size= 0;

  const bool is_gtid_specified= param->gtid_info.type == GTID_GROUP;
  Gtid gtid= { param->gtid_info.sidno, param->gtid_info.gno };
  if (!is_gtid_specified)
  {
    // Dummy values that will be replaced after certification.
    gtid.sidno= 1;
    gtid.gno= 1;
  }

  const Gtid_specification gtid_specification= { GTID_GROUP, gtid };
  Gtid_log_event *gle= NULL;

  Transaction_context_log_event *tcle= NULL;

  // group replication cache.
  IO_CACHE *cache= NULL;

  // Todo optimize for memory (IO-cache‘s buf to start with, if not enough then trans mem-root)
  // to avoid New message create/delete and/or its implicit MessageBuffer.
  Transaction_Message transaction_msg;

  enum enum_gcs_error send_error= GCS_OK;

  // Binlog cache.
  bool is_dml= true;
  bool may_have_sbr_stmts= !is_dml;
  IO_CACHE *cache_log= NULL;
  my_off_t cache_log_position= 0;
  bool reinit_cache_log_required= false;
  const my_off_t trx_cache_log_position= my_b_tell(param->trx_cache_log);
  const my_off_t stmt_cache_log_position= my_b_tell(param->stmt_cache_log);

  if (trx_cache_log_position > 0 && stmt_cache_log_position == 0)
  {
    cache_log= param->trx_cache_log;
    cache_log_position= trx_cache_log_position;
  }
  else if (trx_cache_log_position == 0 && stmt_cache_log_position > 0)
  {
    cache_log= param->stmt_cache_log;
    cache_log_position= stmt_cache_log_position;
    is_dml= false;
    may_have_sbr_stmts= true;
  }
  else
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "We can only use one cache type at a "
                                "time on session %u", param->thread_id);
    shared_plugin_stop_lock->release_read_lock();
    DBUG_RETURN(1);
    /* purecov: end */
  }

  applier_module->get_pipeline_stats_member_collector()
      ->increment_transactions_local();

  DBUG_ASSERT(cache_log->type == WRITE_CACHE);
  DBUG_PRINT("cache_log", ("thread_id: %u, trx_cache_log_position: %llu,"
                           " stmt_cache_log_position: %llu",
                           param->thread_id, trx_cache_log_position,
                           stmt_cache_log_position));

  /*
    Open group replication cache.
    Reuse the same cache on each session for improved performance.
  */
  cache= observer_trans_get_io_cache(param->thread_id,
                                     param->cache_log_max_size);
  if (cache == NULL)
  {
    /* purecov: begin inspected */
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

  // Reinit binlog cache to read.
  if (reinit_cache(cache_log, READ_CACHE, 0))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Failed to reinit binlog cache log for read "
                                "on session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

  /*
    After this, cache_log should be reinit to old saved value when we
    are going out of the function scope.
  */
  reinit_cache_log_required= true;

  // Create transaction context.
  tcle= new Transaction_context_log_event(param->server_uuid,
                                          is_dml,
                                          param->thread_id,
                                          is_gtid_specified);
  if (!tcle->is_valid())
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Failed to create the context of the current "
                "transaction on session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

  if (is_dml)
  {
    Transaction_write_set* write_set= get_transaction_write_set(param->thread_id);
    /*
      When GTID is specified we may have empty transactions, that is,
      a transaction may have not write set at all because it didn‘t
      change any data, it will just persist that GTID as applied.
    */
    if ((write_set == NULL) && (!is_gtid_specified))
    {
      log_message(MY_ERROR_LEVEL, "Failed to extract the set of items written "
                                  "during the execution of the current "
                                  "transaction on session %u", param->thread_id);
      error= pre_wait_error;
      goto err;
    }

    if (write_set != NULL)
    {
      if (add_write_set(tcle, write_set))
      {
        /* purecov: begin inspected */
        cleanup_transaction_write_set(write_set);
        log_message(MY_ERROR_LEVEL, "Failed to gather the set of items written "
                                    "during the execution of the current "
                                    "transaction on session %u", param->thread_id);
        error= pre_wait_error;
        goto err;
        /* purecov: end */
      }
      cleanup_transaction_write_set(write_set);
      DBUG_ASSERT(is_gtid_specified || (tcle->get_write_set()->size() > 0));
    }
    else
    {
      /*
        For empty transactions we should set the GTID may_have_sbr_stmts. See
        comment at binlog_cache_data::may_have_sbr_stmts().
      */
      may_have_sbr_stmts= true;
    }
  }

  // Write transaction context to group replication cache.
  tcle->write(cache);

  // Write Gtid log event to group replication cache.
  gle= new Gtid_log_event(param->server_id, is_dml, 0, 1,
                          may_have_sbr_stmts,
                          gtid_specification);
  gle->write(cache);

  transaction_size= cache_log_position + my_b_tell(cache);
  if (is_dml && transaction_size_limit &&
     transaction_size > transaction_size_limit)
  {
    log_message(MY_ERROR_LEVEL, "Error on session %u. "
                "Transaction of size %llu exceeds specified limit %lu. "
                "To increase the limit please adjust group_replication_transaction_size_limit option.",
                param->thread_id, transaction_size,
                transaction_size_limit);
    error= pre_wait_error;
    goto err;
  }

  // Reinit group replication cache to read.
  if (reinit_cache(cache, READ_CACHE, 0))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while re-initializing an internal "
                                "cache, for read operations, on session %u",
                                param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

  // Copy group replication cache to buffer.
  if (transaction_msg.append_cache(cache))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while appending data to an internal "
                                "cache on session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

  // Copy binlog cache content to buffer.
  if (transaction_msg.append_cache(cache_log))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while writing binary log cache on "
                                "session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }


  DBUG_ASSERT(certification_latch != NULL);
  if (certification_latch->registerTicket(param->thread_id))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Unable to register for getting notifications "
                                "regarding the outcome of the transaction on "
                                "session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

#ifndef DBUG_OFF
  DBUG_EXECUTE_IF("test_basic_CRUD_operations_sql_service_interface",
                  {
                    DBUG_SET("-d,test_basic_CRUD_operations_sql_service_interface");
                    DBUG_ASSERT(!sql_command_check());
                  };);

  DBUG_EXECUTE_IF("group_replication_before_message_broadcast",
                  {
                    const char act[]= "now wait_for waiting";
                    DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
                  });
#endif

  /*
    Check if member needs to throttle its transactions to avoid
    cause starvation on the group.
  */
  applier_module->get_flow_control_module()->do_wait();

  //Broadcast the Transaction Message
  send_error= gcs_module->send_message(transaction_msg);
  if (send_error == GCS_MESSAGE_TOO_BIG)
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error broadcasting transaction to the group "
                                "on session %u. Message is too big.",
                                param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }
  else if (send_error == GCS_NOK)
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while broadcasting the transaction to "
                                "the group on session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

  shared_plugin_stop_lock->release_read_lock();

  DBUG_ASSERT(certification_latch != NULL);
  if (certification_latch->waitTicket(param->thread_id))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while waiting for conflict detection "
                                "procedure to finish on session %u",
                                param->thread_id);
    error= post_wait_error;
    goto err;
    /* purecov: end */
  }

err:
  // Reinit binlog cache to write (revert what we did).
  if (reinit_cache_log_required &&
      reinit_cache(cache_log, WRITE_CACHE, cache_log_position))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while re-initializing an internal "
                                "cache, for write operations, on session %u",
                                param->thread_id);
    /* purecov: end */
  }
  observer_trans_put_io_cache(cache);
  delete gle;
  delete tcle;

  if (error)
  {
    if (error == pre_wait_error)
      shared_plugin_stop_lock->release_read_lock();

    DBUG_ASSERT(certification_latch != NULL);
    // Release and remove certification latch ticket.
    certification_latch->releaseTicket(param->thread_id);
    certification_latch->waitTicket(param->thread_id);
  }

  DBUG_EXECUTE_IF("group_replication_after_before_commit_hook",
                 {
                    const char act[]= "now wait_for signal.commit_continue";
                    DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
                 });
  DBUG_RETURN(error);
}

函数Certification_handler::handle_event

int
Certification_handler::handle_event(Pipeline_event *pevent, Continuation *cont)
{
  DBUG_ENTER("Certification_handler::handle_event");

  Log_event_type ev_type= pevent->get_event_type();
  switch (ev_type)
  {
    case binary_log::TRANSACTION_CONTEXT_EVENT:
      DBUG_RETURN(handle_transaction_context(pevent, cont));
    case binary_log::GTID_LOG_EVENT:
      DBUG_RETURN(handle_transaction_id(pevent, cont));
    case binary_log::VIEW_CHANGE_EVENT:
      DBUG_RETURN(extract_certification_info(pevent, cont));
    default:
      next(pevent, cont);
      DBUG_RETURN(0);
  }
}

/**
This method handles transaction context events by storing them
so they can be used on next handler.

@param[in] pevent   the event to be injected
@param[in] cont     the object used to wait

@return the operation status
  @retval 0      OK
  @retval !=0    Error
*/
int handle_transaction_context(Pipeline_event *pevent, Continuation *cont);


/**
This methods handles transaction identifier events, it does two tasks:
  1. Using transaction context previously processed and stored,
	 validate that this transaction does not conflict with any other;
  2. If the transaction does not conflict and it is allowed to commit,
	 it does inform the server of that decision and does update the
	 transaction identifier if needed.

@param[in] pevent   the event to be injected
@param[in] cont     the object used to wait

@return the operation status
  @retval 0      OK
  @retval !=0    Error
*/
int handle_transaction_id(Pipeline_event *pevent, Continuation *cont);


/*
This method extracts the certification db and the sequence number from
the certifier injecting them in a View change event to be sent to a possible
joiner.
*/
int extract_certification_info(Pipeline_event *pevent, Continuation *cont);

参考资料

MySQL · 引擎特性 · Group Replication内核解析

MySQL · 引擎特性 · Group Replication内核解析之二

MySQL MGR--数据同步原理

原文:https://www.cnblogs.com/gaogao67/p/15098202.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!