1000字范文,内容丰富有趣,学习的好帮手!
1000字范文 > 跨数据库分布式实时事务 - 基于RabbitMQ实时消息队列服务实现

跨数据库分布式实时事务 - 基于RabbitMQ实时消息队列服务实现

时间:2022-10-18 21:44:01

相关推荐

跨数据库分布式实时事务 - 基于RabbitMQ实时消息队列服务实现

如果有任何问题,或者更完善的方案 请加我创建的群 568752806 来一起探讨吧

电脑配置

普通个人台式机电脑1台,配置如下:

处理器:Intel(R) Core(TM) i5-4750 CPU @ 3.2GHz 4核处理器

内存: 共32G 金士顿HyperX 骇客神条 DDR3 1866 8G x 4

固态硬盘:GLOWAY STK512GS3-S7 实际容量 476G

主板:Z97-PRO Wi-Fi ac

网卡:Broadcom 802.11ac

Intel(R) Ethernet Connection(2) I218-V

操作系统:Windows 10 64位教育版系统

搭建环境

VMware Workstation 12 Pro 版本 12.5.7 build-5813279

创建4台虚拟Cent OS 7 系统

主机名称 IP 内存 磁盘大小 说明

192.168.10.20 10G 100G PHP for 开发者

192.168.10.101 4G 80G Mariadb for db1

192.168.10.102 4G 80G Mariadb for db2

192.168.10.105 6G 80G RabbitMQ for 消息存储

在主机 192.168.10.20 上配置 IP到自定义域名之间的映射关系

[root@contoso ~]#cat > /etc/hosts

127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4

::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.10.20 user.

192.168.10.20

192.168.10.20

192.168.10.20 contoso

优化内核 配置

[root@contoso ~]#cat >> /etc/security/limits.conf

* soft nofile 65536

* hard nofile 65536

[root@contoso ~]#ulimit -n 65536

[root@contoso ~]#cat > /etc/sysctl.conf

# sysctl settings are defined through files in

# /usr/lib/sysctl.d/, /run/sysctl.d/, and /etc/sysctl.d/.

#

# Vendors settings live in /usr/lib/sysctl.d/.

# To override a whole file, create a new file with the same in

# /etc/sysctl.d/ and put new settings there. To override

# only specific settings, add a file with a lexically later

# name in /etc/sysctl.d/ and put new settings there.

#

# For more information, see sysctl.conf(5) and sysctl.d(5).

net.ipv4.tcp_syn_retries = 1

net.ipv4.tcp_synack_retries = 1

net.ipv4.tcp_keepalive_time = 600

net.ipv4.tcp_keepalive_probes = 3

net.ipv4.tcp_keepalive_intvl = 15

net.ipv4.tcp_retries2 = 5

net.ipv4.tcp_fin_timeout = 2

net.ipv4.tcp_max_tw_buckets = 36000

net.ipv4.tcp_tw_recycle = 1

net.ipv4.tcp_tw_reuse = 1

net.ipv4.tcp_max_orphans = 32768

net.ipv4.tcp_syncookies = 1

net.ipv4.tcp_max_syn_backlog = 16384

net.ipv4.tcp_wmem = 8192 131072 16777216

net.ipv4.tcp_rmem = 32768 131072 16777216

net.ipv4.tcp_mem = 786432 1048576 1572864

net.ipv4.ip_local_port_range = 1024 65000

[root@contoso ~]#sysctl -p

安装PHP的运行环境,注意不是在root用户登录的环境下安装PHP,而是在管理员myth登录的环境下安装PHP

[myth@contoso ~]$yum install /pub/epel/epel-release-latest-7.noarch.rpm

[myth@contoso ~]$yum install /enterprise/remi-release-7.rpm

[myth@contoso ~]$yum --enablerepo=remi-php71,remi,epel -y install php php-devel php-mysql php-fpm php-pecl-xdebug php-gd php-intl php-freetype php-mcrypt php-mbstring php-pecl-memcached php-pecl-redis php-pecl-swoole

配置远程调试服务器

[myth@contoso ~]$cat > /etc/php.d/15-xdebug.ini

; Enable xdebug extension module

zend_extension=xdebug.so

xdebug.remote_autostart=1

xdebug.remote_enable=1

xdebug.remote_connect_back=1

xdebug.remote_port=9001

xdebug.remote_handler=dbgp

; see /docs/all_settings

Apache优化:修改最大并发连接数,以下是大型网站的配置参数,参数值除以2属于中型网站的配置参数,如有问题可以再调整

[root@contoso ~]#ll /etc/httpd/conf.modules.d

total 32

-rw-r--r-- 1 root root 3739 Apr 12 21:50 00-base.conf

-rw-r--r-- 1 root root 139 Apr 12 21:50 00-dav.conf

-rw-r--r-- 1 root root 41 Apr 12 21:50 00-lua.conf

-rw-r--r-- 1 root root 950 Aug 30 03:43 00-mpm.conf

-rw-r--r-- 1 root root 957 Apr 12 21:50 00-proxy.conf

-rw-r--r-- 1 root root 88 Apr 12 21:50 00-systemd.conf

-rw-r--r-- 1 root root 451 Apr 12 21:50 01-cgi.conf

-rw-r--r-- 1 root root 423 Aug 2 18:21 15-php.conf

[root@contoso ~]#cat > /etc/httpd/conf.modules.d/00-mpm.conf

# Select the MPM module which should be used by uncommenting exactly

# one of the following LoadModule lines:

# prefork MPM: Implements a non-threaded, pre-forking web server

# See: /docs/2.4/mod/prefork.html

LoadModule mpm_prefork_module modules/mod_mpm_prefork.so

<IfModule mpm_prefork_module>

StartServers 100

MinSpareServers 100

MaxSpareServers 120

ServerLimit 3000

MaxClients 3000

MaxRequestsPerChild 500000

</IfModule>

# worker MPM: Multi-Processing Module implementing a hybrid

# multi-threaded multi-process web server

# See: /docs/2.4/mod/worker.html

#

#LoadModule mpm_worker_module modules/mod_mpm_worker.so

# event MPM: A variant of the worker MPM with the goal of consuming

# threads only for connections with active processing

# See: /docs/2.4/mod/event.html

#

#LoadModule mpm_event_module modules/mod_mpm_event.so

[root@contoso ~]#

在主机 192.168.10.101 上配置 IP到自定义域名之间的映射关系

[root@mariadb1 ~]#cat > /etc/hosts

127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4

::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.10.101

[root@mariadb1 ~]#

优化内核 配置

[root@mariadb1 ~]#cat >> /etc/security/limits.conf

* soft nofile 65536

* hard nofile 65536

[root@mariadb1 ~]#ulimit -n 65536

[root@mariadb1 ~]#cat > /etc/sysctl.conf

# sysctl settings are defined through files in

# /usr/lib/sysctl.d/, /run/sysctl.d/, and /etc/sysctl.d/.

#

# Vendors settings live in /usr/lib/sysctl.d/.

# To override a whole file, create a new file with the same in

# /etc/sysctl.d/ and put new settings there. To override

# only specific settings, add a file with a lexically later

# name in /etc/sysctl.d/ and put new settings there.

#

# For more information, see sysctl.conf(5) and sysctl.d(5).

net.ipv4.tcp_syn_retries = 1

net.ipv4.tcp_synack_retries = 1

net.ipv4.tcp_keepalive_time = 600

net.ipv4.tcp_keepalive_probes = 3

net.ipv4.tcp_keepalive_intvl = 15

net.ipv4.tcp_retries2 = 5

net.ipv4.tcp_fin_timeout = 2

net.ipv4.tcp_max_tw_buckets = 36000

net.ipv4.tcp_tw_recycle = 1

net.ipv4.tcp_tw_reuse = 1

net.ipv4.tcp_max_orphans = 32768

net.ipv4.tcp_syncookies = 1

net.ipv4.tcp_max_syn_backlog = 16384

net.ipv4.tcp_wmem = 8192 131072 16777216

net.ipv4.tcp_rmem = 32768 131072 16777216

net.ipv4.tcp_mem = 786432 1048576 1572864

net.ipv4.ip_local_port_range = 1024 65000

[root@mariadb1 ~]#sysctl -p

安装MariaDB

官网给出的baseurl地址下载太慢了

[root@mariadb1 ~]#cat > /etc/yum.repos.d/MariaDB.repo

# MariaDB 10.2 CentOS repository list - created -07-08 12:50 UTC

# /mariadb/repositories/

[mariadb]

name = MariaDB

baseurl = /10.2/centos7-amd64

gpgkey=/RPM-GPG-KEY-MariaDB

gpgcheck=1

清华的baseurl下载速度很快

[root@mariadb1 ~]# cat > /etc/yum.repos.d/MariaDB.repo

# MariaDB 10.2 CentOS repository list - created -07-08 12:50 UTC

# /mariadb/repositories/

[mariadb]

name = MariaDB

baseurl = https://mirrors.tuna./mariadb/mariadb-10.2.7/yum/centos7-amd64/

gpgkey=/RPM-GPG-KEY-MariaDB

gpgcheck=1

[root@mariadb1 ~]#yum install -y MariaDB-server MariaDB-client

接下来配置数据库服务器存储数据的编码格式,配置查询日志 慢查询日志 binlog日志存储路径,默认这些功能他们是不开启的

[root@mariadb1~]#mkdir -p /var/log/mariadb && touch /var/log/mariadb/queries.log && touch /var/log/mariadb/mariadb-error.log && touch /var/log/mariadb/mariadb-slow.log && touch /var/log/mariadb/mariadb-log-bin && touch /var/log/mariadb/mariadb-log-bin.index && chown -R mysql:mysql /var/log/mariadb && chmod 755 /var/log/mariadb && chmod 666 /var/log/mariadb/*

[root@mariadb1~]#

[root@mariadb1~]#cat > /etc/f.d/f

#

# These groups are read by MariaDB server.

# Use it for options that only the server (but not clients) should see

#

# See the examples of server f files in /usr/share/mysql/

#

# this is read by the standalone daemon and embedded servers

[server]

# this is only for the mysqld standalone daemon

[mysqld]

character-set-server=utf8

lower-case-table-names=1

log-bin=/var/log/mariadb/mariadb-log-bin

log-bin-index=/var/log/mariadb/mariadb-log-bin.index

log-error=/var/log/mariadb/mariadb-error.log

general-log=ON

general-log-file=/var/log/mariadb/queries.log

log-output=file

slow-query-log=ON

slow-query-log-file=/var/log/mariadb/mariadb-slow.log

long_query_time=1

#

# * Galera-related settings

#

[galera]

# Mandatory settings

#wsrep_on=ON

#wsrep_provider=

#wsrep_cluster_address=

#binlog_format=row

#default_storage_engine=InnoDB

#innodb_autoinc_lock_mode=2

#

# Allow server to accept connections on all interfaces.

#

#bind-address=0.0.0.0

#

# Optional setting

#wsrep_slave_threads=1

#innodb_flush_log_at_trx_commit=0

# this is only for embedded server

[embedded]

# This group is only read by MariaDB servers, not by MySQL.

# If you use the same .cnf file for MySQL and MariaDB,

# you can put MariaDB-only options here

[mariadb]

# This group is only read by MariaDB-10.1 servers.

# If you use the same .cnf file for MariaDB of different versions,

# use this group for options that older servers don't understand

[mariadb-10.1]

[root@mariadb1~]#

在主机 192.168.10.102 上配置 IP到自定义域名之间的映射关系

[root@mariadb2 ~]#cat > /etc/hosts

127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4

::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.10.102

[root@mariadb2 ~]#

优化内核 配置

[root@mariadb2 ~]#cat >> /etc/security/limits.conf

* soft nofile 65536

* hard nofile 65536

[root@mariadb2 ~]#ulimit -n 65536

[root@mariadb2 ~]#cat > /etc/sysctl.conf

# sysctl settings are defined through files in

# /usr/lib/sysctl.d/, /run/sysctl.d/, and /etc/sysctl.d/.

#

# Vendors settings live in /usr/lib/sysctl.d/.

# To override a whole file, create a new file with the same in

# /etc/sysctl.d/ and put new settings there. To override

# only specific settings, add a file with a lexically later

# name in /etc/sysctl.d/ and put new settings there.

#

# For more information, see sysctl.conf(5) and sysctl.d(5).

net.ipv4.tcp_syn_retries = 1

net.ipv4.tcp_synack_retries = 1

net.ipv4.tcp_keepalive_time = 600

net.ipv4.tcp_keepalive_probes = 3

net.ipv4.tcp_keepalive_intvl = 15

net.ipv4.tcp_retries2 = 5

net.ipv4.tcp_fin_timeout = 2

net.ipv4.tcp_max_tw_buckets = 36000

net.ipv4.tcp_tw_recycle = 1

net.ipv4.tcp_tw_reuse = 1

net.ipv4.tcp_max_orphans = 32768

net.ipv4.tcp_syncookies = 1

net.ipv4.tcp_max_syn_backlog = 16384

net.ipv4.tcp_wmem = 8192 131072 16777216

net.ipv4.tcp_rmem = 32768 131072 16777216

net.ipv4.tcp_mem = 786432 1048576 1572864

net.ipv4.ip_local_port_range = 1024 65000

[root@mariadb2 ~]#sysctl -p

安装MariaDB

官网给出的baseurl地址下载太慢了

[root@mariadb2 ~]#cat > /etc/yum.repos.d/MariaDB.repo

# MariaDB 10.2 CentOS repository list - created -07-08 12:50 UTC

# /mariadb/repositories/

[mariadb]

name = MariaDB

baseurl = /10.2/centos7-amd64

gpgkey=/RPM-GPG-KEY-MariaDB

gpgcheck=1

清华的baseurl下载速度很快

[root@mariadb2 ~]# cat > /etc/yum.repos.d/MariaDB.repo

# MariaDB 10.2 CentOS repository list - created -07-08 12:50 UTC

# /mariadb/repositories/

[mariadb]

name = MariaDB

baseurl = https://mirrors.tuna./mariadb/mariadb-10.2.7/yum/centos7-amd64/

gpgkey=/RPM-GPG-KEY-MariaDB

gpgcheck=1

[root@mariadb2 ~]#yum install -y MariaDB-server MariaDB-client

接下来配置数据库服务器存储数据的编码格式,配置查询日志 慢查询日志 binlog日志存储路径,默认这些功能他们是不开启的

[root@mariadb2 ~]#mkdir -p /var/log/mariadb && touch /var/log/mariadb/queries.log && touch /var/log/mariadb/mariadb-error.log && touch /var/log/mariadb/mariadb-slow.log && touch /var/log/mariadb/mariadb-log-bin && touch /var/log/mariadb/mariadb-log-bin.index && chown -R mysql:mysql /var/log/mariadb && chmod 755 /var/log/mariadb && chmod 666 /var/log/mariadb/*

[root@mariadb2 ~]#

[root@mariadb2~]#cat > /etc/f.d/f

#

# These groups are read by MariaDB server.

# Use it for options that only the server (but not clients) should see

#

# See the examples of server f files in /usr/share/mysql/

#

# this is read by the standalone daemon and embedded servers

[server]

# this is only for the mysqld standalone daemon

[mysqld]

character-set-server=utf8

lower-case-table-names=1

log-bin=/var/log/mariadb/mariadb-log-bin

log-bin-index=/var/log/mariadb/mariadb-log-bin.index

log-error=/var/log/mariadb/mariadb-error.log

general-log=ON

general-log-file=/var/log/mariadb/queries.log

log-output=file

slow-query-log=ON

slow-query-log-file=/var/log/mariadb/mariadb-slow.log

long_query_time=1

#

# * Galera-related settings

#

[galera]

# Mandatory settings

#wsrep_on=ON

#wsrep_provider=

#wsrep_cluster_address=

#binlog_format=row

#default_storage_engine=InnoDB

#innodb_autoinc_lock_mode=2

#

# Allow server to accept connections on all interfaces.

#

#bind-address=0.0.0.0

#

# Optional setting

#wsrep_slave_threads=1

#innodb_flush_log_at_trx_commit=0

# this is only for embedded server

[embedded]

# This group is only read by MariaDB servers, not by MySQL.

# If you use the same .cnf file for MySQL and MariaDB,

# you can put MariaDB-only options here

[mariadb]

# This group is only read by MariaDB-10.1 servers.

# If you use the same .cnf file for MariaDB of different versions,

# use this group for options that older servers don't understand

[mariadb-10.1]

[root@mariadb2 ~]#

在主机 192.168.10.105 上配置 IP到自定义域名之间的映射关系

[root@rabbitmq1 ~]#cat > /etc/hosts

127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4

::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.10.105

[root@rabbitmq1 ~]#

优化内核 配置

[root@rabbitmq1 ~]#cat >> /etc/security/limits.conf

* soft nofile 65536

* hard nofile 65536

[root@rabbitmq1 ~]#ulimit -n 65536

[root@rabbitmq1 ~]#cat > /etc/sysctl.conf

# sysctl settings are defined through files in

# /usr/lib/sysctl.d/, /run/sysctl.d/, and /etc/sysctl.d/.

#

# Vendors settings live in /usr/lib/sysctl.d/.

# To override a whole file, create a new file with the same in

# /etc/sysctl.d/ and put new settings there. To override

# only specific settings, add a file with a lexically later

# name in /etc/sysctl.d/ and put new settings there.

#

# For more information, see sysctl.conf(5) and sysctl.d(5).

net.ipv4.tcp_syn_retries = 1

net.ipv4.tcp_synack_retries = 1

net.ipv4.tcp_keepalive_time = 600

net.ipv4.tcp_keepalive_probes = 3

net.ipv4.tcp_keepalive_intvl = 15

net.ipv4.tcp_retries2 = 5

net.ipv4.tcp_fin_timeout = 2

net.ipv4.tcp_max_tw_buckets = 36000

net.ipv4.tcp_tw_recycle = 1

net.ipv4.tcp_tw_reuse = 1

net.ipv4.tcp_max_orphans = 32768

net.ipv4.tcp_syncookies = 1

net.ipv4.tcp_max_syn_backlog = 16384

net.ipv4.tcp_wmem = 8192 131072 16777216

net.ipv4.tcp_rmem = 32768 131072 16777216

net.ipv4.tcp_mem = 786432 1048576 1572864

net.ipv4.ip_local_port_range = 1024 65000

[root@rabbitmq1 ~]#sysctl -p

消息服务器的安装请按照在CentOS 7系统上安装RabbitMQ /zhengzizhi/article/details/77018658

在主机 192.168.10.20 上配置虚拟主机:

[root@contoso ~]#cat > /etc/httpd/conf.d/httpd-vhosts.conf

<Directory "/home/myth/www/think">

Options +Indexes +FollowSymLinks

Order allow,deny

Allow from all

AllowOverride All

Require all granted

</Directory>

<VirtualHost *:80>

ServerAdmin zhengzizhi@

DocumentRoot "/home/myth/www/think/public"

ServerName

ServerAlias

ErrorLog "/home/myth/log/httpd/contoso-error_log"

CustomLog "/home/myth/log/httpd/contoso-access_log" common

</VirtualHost>

<Directory "/home/myth/www/think">

Options +Indexes +FollowSymLinks

Order allow,deny

Allow from all

AllowOverride All

Require all granted

</Directory>

<VirtualHost *:80>

ServerAdmin zhengzizhi@

DocumentRoot "/home/myth/www/think/public"

ServerName

ServerAlias

ErrorLog "/home/myth/log/httpd/corp-contoso-error_log"

CustomLog "/home/myth/log/httpd/corp-contoso-access_log" common

</VirtualHost>

<Directory "/home/myth/www/think">

Options +Indexes +FollowSymLinks

Order allow,deny

Allow from all

AllowOverride All

Require all granted

</Directory>

<VirtualHost *:80>

ServerAdmin zhengzizhi@

DocumentRoot "/home/myth/www/think/public"

ServerName user.

ServerAlias user.

ErrorLog "/home/myth/log/httpd/user-corp-contoso-error_log"

CustomLog "/home/myth/log/httpd/user-corp-contoso-access_log" common

</VirtualHost>

[root@contoso ~]#

[myth@contoso ~]$mkdir -p /home/myth/log/httpd#注意:日志文件的目录需要提前创建好,

[root@contoso ~]#sed -i -- 's/^#ServerName :80/ServerName :80/g' /etc/httpd/conf/httpd.conf

直接贴消费者的实现代码 think\apps\index\controller\Consumer.php:

<?php/** 作者:zhengzizhi@* 日期:二O一七年 七夕节*/namespace app\index\controller;use PDO;use think\Db;use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;class Consumer{private $connection;private $channel;public function __construct(){}/*** Special remind: before updating consumer code to debug,first,please close consumer listen! * Listens for incoming messages* * [root@rabbitmq1 ~]# rabbitmqctl delete_vhost / && rabbitmqctl add_vhost / && rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'* [root@rabbitmq1 ~]# systemctl restart rabbitmq-server && rabbitmqadmin list exchanges* * [root@contoso ~]# cd /home/myth/www/think && php public/index.php index/Consumer/listen*/public function listen(){$this-> connection = new AMQPStreamConnection('192.168.10.105', 5672, 'guest', 'guest');$this->channel = $this-> connection->channel();/*** Declares queue, creates if needed** @param string $queue* @param bool $passive* @param bool $durable* @param bool $exclusive* @param bool $auto_delete* @param bool $nowait* @param array $arguments* @param int $ticket* @return mixed|null*/$this->channel->queue_declare('bank.transfers1',#queue - Should be unique in direct exchangefalse, #passive - false Don't check if a queue with the same name existstrue, #durable - true The queue will survive(exist) server restartsfalse, #exclusive - false The queue might be accessed by other channelsfalse #auto_delete - false The queue won't be deleted once the channel is closed);/*** Declares exchange** @param string $exchange* @param string $type* @param bool $passive* @param bool $durable* @param bool $auto_delete* @param bool $internal* @param bool $nowait* @param array $arguments* @param int $ticket* @return mixed|null*/$this->channel->exchange_declare('corp1.fanout', #exchange - That is the exchange(corp.direct)'fanout',#type - That is the type(direct) of exchange(corp.direct)false, #passive - false Don't check if a exchange with the same name existstrue,#durable - true The exchange will survive(exist) server restartsfalse#auto_delete - false The exchange won't be deleted once the channel is closed);/*** Binds queue to an exchange** @param string $queue* @param string $exchange* @param string $routing_key* @param bool $nowait* @param array $arguments* @param int $ticket* @return mixed|null*/$this->channel->queue_bind('bank.transfers1', 'corp1.fanout');/*** Specifies QoS* don't dispatch a new message to a worker until it has processed and* acknowledged the previous one. Instead, it will dispatch it to the* next worker that is not still busy.** @param int $prefetch_size* @param int $prefetch_count* @param bool $a_global* @return mixed*/$this->channel->basic_qos(null, #prefetch size - prefetch window size in octets, null meaning "no specific limit"1,#prefetch count - prefetch window in terms of whole messagesnull #a_global - null to mean that the QoS settings should apply per-consumer#a_global - true to mean that the QoS settings should apply per-channel);/*** Starts a queue consumer** @param string $queue* @param string $consumer_tag* @param bool $no_local* @param bool $no_ack* @param bool $exclusive* @param bool $nowait* @param callback|null $callback* @param int|null $ticket* @param array $arguments* @return mixed|string*/$this->channel->basic_consume('bank.transfers1',#queue - get the messages from the queue(bank.transfers)'',#consumer_tag - Consumer identifierfalse, #no_local - Don't receive messages published by this consumerfalse, #no_ack - false acks turned on, - true turned off. send a proper acknowledgment from the worker, once we're done with a taskfalse, #exclusive - false The queue(bank.transfers) may be accessed by the all connectionsfalse, #nowait - false Don't wait for a server response[$this, 'callback1'] #callback - A PHP callback);$this->channel->queue_declare('bank.transfers2', #queue - Should be unique in direct exchangefalse, #passive - false Don't check if a queue with the same name existstrue, #durable - true The queue will survive(exist) server restartsfalse, #exclusive - false The queue might be accessed by other channelsfalse #auto_delete - false The queue won't be deleted once the channel is closed);$this->channel->exchange_declare('corp2.fanout', #exchange - That is the exchange(corp.direct)'fanout', #type - That is the type(direct) of exchange(corp.direct)false,#passive - false Don't check if a exchange with the same name existstrue, #durable - true The exchange will survive(exist) server restartsfalse #auto_delete - false The exchange won't be deleted once the channel is closed);$this->channel->queue_bind('bank.transfers2', 'corp2.fanout');$this->channel->basic_consume('bank.transfers2',#queue - get the messages from the queue(bank.transfers)'',#consumer_tag - Consumer identifierfalse, #no_local - Don't receive messages published by this consumerfalse, #no_ack - false acks turned on, - true turned off. send a proper acknowledgment from the worker, once we're done with a taskfalse, #exclusive - false The queue(bank.transfers) may be accessed by the all connectionsfalse, #nowait - false Don't wait for a server response[$this, 'callback2'] #callback - A PHP callback);// 'Consuming from queue';# Loop as long as the channel has callbacks registered# After 10 seconds there will be a timeout exception# $channel->wait(null, false, 10)while(count($this->channel->callbacks)) {// 'Waiting for incoming messages'$this->channel->wait();}$this->channel->close();$this->connection->close();}/*** Executes when a message is received.** @param AMQPMessage $req*/public function callback1(AMQPMessage $req) {$account = json_decode($req->body);$msg_id = $req->get('correlation_id');$user_from = $account->user_from;$amount = $account->amount; $status = $account->status;$user_to = $account->user_to; $_isSuccess = 1;if($account->status == 0){$status = 1;$db2 = Db::connect('db2');$db2->startTrans(); try{$cnt = $db2->query('SELECT COUNT(*) AS cnt FROM think_message_apply a WHERE a.msg_id = ?',[$msg_id]);if ($cnt[0] == ['cnt'=>0]) {$db2->execute('INSERT INTO think_message_apply(msg_id,user_from,amount,status,user_to)VALUES(?,?,?,?,?)',[$msg_id,$user_from,$amount,$status,$user_to]);}$db2->commit();} catch (\Exception $e){$db2->rollback();$_isSuccess = 0;}}if($_isSuccess == 1 && $account->status == 0){$status = 1;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => true,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg, #msg'', #exchange$req->get('reply_to') #routing_key);//回复一条入账成功的消息给生产者(消息发送者) /*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '1');//确认一条回复消息已经发送} else {$status = 0;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => false,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg, #msg'', #exchange$req->get('reply_to') #routing_key);/*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '1');}}/*** Executes when a message is received.** @param AMQPMessage $req*/public function callback2(AMQPMessage $req) {$account = json_decode($req->body);$msg_id = $req->get('correlation_id');$user_from = $account->user_from;$amount = $account->amount; $status = $account->status;$user_to = $account->user_to; $_isSuccess = 1;if($account->status == 1){$status = 2;$db2 = Db::connect('db2');$db2->startTrans();try{$db2->execute('UPDATE think_account a SET a.amount = a.amount + ? WHERE a.user_id = ?',[$amount,$user_to]);$db2->execute('UPDATE think_message_apply b SET b.STATUS = ? WHERE b.msg_id = ?',[$status,$msg_id]);$db2->commit();} catch (\Exception $e){$db2->rollback();$_isSuccess = 0;}}if($_isSuccess == 1 && $account->status == 1){$status = 2;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => true,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg, #msg'', #exchange$req->get('reply_to') #routing_key);//回复一条入账成功的消息给生产者 /*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '2');//确认一条回复消息已经发送} else {$status = 1;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => false,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg, #msg'', #exchange$req->get('reply_to') #routing_key);/*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '2');//确认一条回复消息已经发送}}}

直接贴生产者的实现代码 think\apps\index\controller\Producer.php:

<?php/** 作者:zhengzizhi@* 日期:二O一七年 七夕节*/namespace app\index\controller;use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;use think\Request;use think\Db;use PDO;class Producer{private $connection;private $channel;private $callback1_queue;private $callback2_queue;private $pass_msg1 = true;private $pass_msg2 = true;/*** @var string*/private $response;private $waiting1;private $waiting2;/*** @var string*/private $corr_id;private $suffix1 = ['01','02','03','04','05','06','07','08','09'];private $suffix2 = ['10','20','30','40','50','60','70','80','90'];public function __construct() {$this->connection = new AMQPStreamConnection('192.168.10.105', 5672, 'guest', 'guest');$this->channel = $this->connection->channel();}/*** @return string* * [root@contoso ~]# chown -R apache:apache /home/myth/www/think/apps/bank-data/data.csv && ll /home/myth/www/think/apps* [root@contoso ~]# chmod -R 0755 /home/myth/www/think/apps/bank-data && ll /home/myth/www/think/apps/bank-data* [root@contoso ~]# cat /home/myth/www/think/apps/bank-data/data.csv* [root@contoso ~]# cat /dev/null > /home/myth/www/think/apps/bank-data/data.csv** [root@mariadbxxx ~]# cat /dev/null > /var/log/mariadb/queries.log && cat /dev/null > /var/log/mariadb/mariadb-slow.log && cat /dev/null > /var/log/mariadb/mariadb-error.log* [root@mariadbxxx ~]# mysql -uroot -p123456 -h127.0.0.1 -e "reset master"* * GET /index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2* * [root@rabbitmq1 ~]# rabbitmqadmin list bindings* [root@rabbitmq1 ~]# rabbitmqadmin list queues* [root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers1 requeue=true count=30* [root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers2 requeue=true count=10* * [myth@contoso ~]$ ab -r -t 7200 -s 7200 -k -n 100000 -c 500 "/index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2"*/public function transfer(Request $request){$this->response = null;$msg_id = session_create_id();//uniqid(); //$index = random_int(1,9);/** $this->corr_id has a value like 53e26b393313a*/$this->corr_id = $msg_id;$user_from = $request->param('account.user_from');$amount = $request->param('account.amount');$status = $request->param('account.status');$user_to = $request->param('account.user_to');$account = ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,]; $this->channel->exchange_declare("corp1.fanout",'fanout',false,true,false);list($this->callback1_queue,, ) = $this->channel->queue_declare('', #queue $msg_id.$this->suffix1[$index]false, #passivetrue, #durabletrue, #exclusivefalse#auto delete);$this->channel->queue_declare("bank.transfers1", #queuefalse, #passivetrue, #durablefalse, #exclusivefalse #auto delete);$this->channel->queue_bind('', "corp1.fanout");$this->channel->queue_bind("bank.transfers1", "corp1.fanout");/** create a message with two properties: reply_to, which is set to the* callback queue and correlation_id, which is set to a unique value for* every request*/$msg1 = new AMQPMessage(json_encode($account), #body['correlation_id' => $this->corr_id,'reply_to' => $this->callback1_queue,'delivery_mode' => 2,]#properties);/*** Publishes a message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$this->channel->basic_publish($msg1, #message"corp1.fanout", #exchange"bank.transfers1",#routing keytrue, #mandatoryfalse);$this->channel->basic_consume($this->callback1_queue, #queue'', #consumer_tag = amq.ctag-bzBXVZr5iF7R16bq1NYgYwfalse, #no localfalse, #no ackfalse, #exclusivefalse, #no wait[$this, 'onCallback1']#callback);$this->waiting1 = false;while(!$this->waiting1) {$this->channel->wait();}if($this->pass_msg1 == false){$this->channel->close();$this->connection->close();return $this->response;}$this->channel->exchange_declare("cor2.fanout",'fanout',false,true,false);list($this->callback2_queue,, ) = $this->channel->queue_declare('', #queue $msg_id.$this->suffix2[$index]false, #passivetrue, #durabletrue, #exclusivefalse#auto delete);$this->channel->queue_declare("bank.transfers2", #queuefalse,#passivetrue,#durablefalse,#exclusivefalse#auto delete);$this->channel->queue_bind('', "corp2.fanout");$this->channel->queue_bind("bank.transfers2", "corp2.fanout");$account['status'] = 1;$msg2 = new AMQPMessage(json_encode($account), #body['correlation_id' => $this->corr_id,'reply_to' => $this->callback2_queue,'delivery_mode' => 2,]#properties);$this->channel->basic_publish($msg2, #message"corp2.fanout", #exchange "bank.transfers2",#routing keytrue, #mandatoryfalse);$this->channel->basic_consume($this->callback2_queue, #queue'', #consumer_tag = amq.ctag-bzBXVZr5iF7R16bq1NYgYwfalse, #no localfalse, #no ackfalse, #exclusivefalse, #no wait[$this, 'onCallback2']#callback);$this->waiting2 = false;while(!$this->waiting2) {$this->channel->wait();}$this->channel->close();$this->connection->close();return $this->response;}/*** When a message appears, it checks the correlation_id property. If it* matches the value from the request it returns the response to the* application.** @param AMQPMessage $rep*/public function onCallback1(AMQPMessage $rep) {if($rep->get('correlation_id') == $this->corr_id) { $this->waiting1 = true;$body = json_decode($rep->body);if($body->success == true){$msg_id = $body->data->msg_id;$user_from = $body->data->user_from;$amount = $body->data->amount;$status = $body->data->status;$user_to = $body->data->user_to;if($status == 1){$db1 = Db::connect('db1');$db1->startTrans();try {$db1->execute('INSERT INTO think_message_supply(msg_id,user_from,amount,status,user_to) VALUES (?,?,?,?,?)',[$msg_id,$user_from,$amount,$status,$user_to]);$db1->commit();} catch (\Exception $e){$db1->rollback();$this->response = json(['success'=>0,'msg'=>"First Transaction db1 has failed."]);$this->pass_msg1 = false;}} } else {$this->response = json(['success'=>0,'msg'=>"First Transaction db2 has failed."]);$this->pass_msg1 = false;}}}/*** When a message appears, it checks the correlation_id property. If it* matches the value from the request it returns the response to the* application.** @param AMQPMessage $rep*/public function onCallback2(AMQPMessage $rep) {if($rep->get('correlation_id') == $this->corr_id) {$this->waiting2 = true;$body = json_decode($rep->body);if( $body->success == true){ $msg_id = $body->data->msg_id;$user_id = $body->data->user_from;$amount = $body->data->amount;$status = $body->data->status;if($status == 2){$db1 = Db::connect('db1');$db1->startTrans();try {$db1->execute('UPDATE think_message_supply a SET a.status = ? WHERE a.msg_id = ?',[$status,$msg_id]);$db1->execute('UPDATE think_account b SET b.amount = b.amount - ? WHERE b.user_id = ?',[$amount,$user_id]);$db1->commit();} catch (\Exception $e){$db1->rollback();file_put_contents(APP_PATH.'bank-data/data.csv', $rep->body.' ## '.date('Y-m-d H:i:s',time()).PHP_EOL, FILE_APPEND);$this->response = json(['success'=>1,'msg'=>"Avoid to rededuct money,now you must contact our customer."]);$this->pass_msg2 = false;}if($this->pass_msg2 == true){$this->response = json(['success'=>1,'msg'=>'Bank Transaction is successfull.']);}} } else {$this->response = json(['success'=>0,'msg'=>"Second Transaction db2 has failed."]);}}}}

看看基于实时消息的分布式数据库代码的执行流程吧,很详细直观,按照箭头标识的圆圈序号阅读实现流程:

图片比较大显示到网页页面缩小了比例,导致不清晰,请将如下2张流程图复制到画图工具里面阅读

顺便提一下这些文件里的参数值都跟系统的性能密切相关

[root@contoso ~]# ll /proc/sys/net/ipv4

total 0

-rw-r--r-- 1 root root 0 Aug 29 21:50 cipso_cache_bucket_size

-rw-r--r-- 1 root root 0 Aug 29 21:50 cipso_cache_enable

-rw-r--r-- 1 root root 0 Aug 29 21:50 cipso_rbm_optfmt

-rw-r--r-- 1 root root 0 Aug 29 21:50 cipso_rbm_strictvalid

dr-xr-xr-x 1 root root 0 Aug 28 22:24 conf

-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_echo_ignore_all

-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_echo_ignore_broadcasts

-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_errors_use_inbound_ifaddr

-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_ignore_bogus_error_responses

-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_ratelimit

-rw-r--r-- 1 root root 0 Aug 29 21:50 icmp_ratemask

-rw-r--r-- 1 root root 0 Aug 29 21:50 igmp_max_memberships

-rw-r--r-- 1 root root 0 Aug 29 21:50 igmp_max_msf

-rw-r--r-- 1 root root 0 Aug 29 21:50 igmp_qrv

-rw-r--r-- 1 root root 0 Aug 29 21:50 inet_peer_maxttl

-rw-r--r-- 1 root root 0 Aug 29 21:50 inet_peer_minttl

-rw-r--r-- 1 root root 0 Aug 29 21:50 inet_peer_threshold

-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_default_ttl

-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_dynaddr

-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_early_demux

-rw-r--r-- 1 root root 0 Aug 28 22:24 ip_forward

-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_forward_use_pmtu

-rw-r--r-- 1 root root 0 Aug 29 21:50 ipfrag_high_thresh

-rw-r--r-- 1 root root 0 Aug 29 21:50 ipfrag_low_thresh

-rw-r--r-- 1 root root 0 Aug 29 21:50 ipfrag_max_dist

-rw-r--r-- 1 root root 0 Aug 29 21:50 ipfrag_secret_interval

-rw-r--r-- 1 root root 0 Aug 29 21:50 ipfrag_time

-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_local_port_range

-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_local_reserved_ports

-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_nonlocal_bind

-rw-r--r-- 1 root root 0 Aug 29 21:50 ip_no_pmtu_disc

dr-xr-xr-x 1 root root 0 Aug 29 21:50 neigh

-rw-r--r-- 1 root root 0 Aug 29 21:50 ping_group_range

dr-xr-xr-x 1 root root 0 Aug 29 21:50 route

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_abort_on_overflow

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_adv_win_scale

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_allowed_congestion_control

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_app_win

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_autocorking

-r--r--r-- 1 root root 0 Aug 29 21:50 tcp_available_congestion_control

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_base_mss

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_challenge_ack_limit

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_congestion_control

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_dsack

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_early_retrans

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_ecn

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_fack

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_fastopen

-rw------- 1 root root 0 Aug 29 21:50 tcp_fastopen_key

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_fin_timeout

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_frto

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_invalid_ratelimit

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_keepalive_intvl

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_keepalive_probes

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_keepalive_time

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_limit_output_bytes

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_low_latency

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_max_orphans

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_max_ssthresh

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_max_syn_backlog

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_max_tw_buckets

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_mem

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_min_tso_segs

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_moderate_rcvbuf

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_mtu_probing

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_no_metrics_save

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_notsent_lowat

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_orphan_retries

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_reordering

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_retrans_collapse

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_retries1

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_retries2

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_rfc1337

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_rmem

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_sack

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_slow_start_after_idle

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_stdurg

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_synack_retries

-rw-r--r-- 1 root root 0 Aug 28 22:24 tcp_syncookies

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_syn_retries

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_thin_dupack

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_thin_linear_timeouts

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_timestamps

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_tso_win_divisor

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_tw_recycle

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_tw_reuse

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_window_scaling

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_wmem

-rw-r--r-- 1 root root 0 Aug 29 21:50 tcp_workaround_signed_windows

-rw-r--r-- 1 root root 0 Aug 29 21:50 udp_mem

-rw-r--r-- 1 root root 0 Aug 29 21:50 udp_rmem_min

-rw-r--r-- 1 root root 0 Aug 29 21:50 udp_wmem_min

-rw-r--r-- 1 root root 0 Aug 29 21:50 xfrm4_gc_thresh

[root@contoso ~]#

cat /proc/sys/net/ipv4/tcp_syn_retries ##默认6

cat /proc/sys/net/ipv4/tcp_synack_retries ##默认5

cat /proc/sys/net/ipv4/tcp_keepalive_time ##默认7200

cat /proc/sys/net/ipv4/tcp_keepalive_probes ##默认9

cat /proc/sys/net/ipv4/tcp_keepalive_intvl ##默认75

cat /proc/sys/net/ipv4/tcp_retries2 ##默认15

cat /proc/sys/net/ipv4/tcp_fin_timeout ##默认60

cat /proc/sys/net/ipv4/tcp_max_tw_buckets ##默认65536

cat /proc/sys/net/ipv4/tcp_tw_recycle ##默认0

cat /proc/sys/net/ipv4/tcp_tw_reuse ##默认0

cat /proc/sys/net/ipv4/tcp_max_orphans ##默认65536

cat /proc/sys/net/ipv4/tcp_syncookies ##默认0

cat /proc/sys/net/ipv4/tcp_max_syn_backlog ##默认512

cat /proc/sys/net/ipv4/tcp_wmem ##默认4096 16384 4194304

cat /proc/sys/net/ipv4/tcp_rmem ##默认4096 87380 6291456

cat /proc/sys/net/ipv4/tcp_mem ##默认378486 504649 756972

cat /proc/sys/net/ipv4/ip_local_port_range ##默认32768 60999

以下几项在开启防火墙的情况下可以配置

systemctl start firewalld 这是启动防火墙服务

cat /proc/sys/net/ipv4/ip_conntrack_max ##默认

cat /proc/sys/net/ipv4/netfilter.ip_conntrack_max ##默认

cat /proc/sys/net/ipv4/netfilter.ip_conntrack_tcp_timeout_established ##默认

cat /proc/sys/net/ipv4/net.core.somaxconn ##默认

cat /proc/sys/net/ipv4/dev_max_backlog ##默认

优化系统内核参数的配置如下,前面已经配置过了,此处就不要再执行下面命令了,cat 编辑配置文件比 vim方便多了,超级喜欢这样玩,

再集中的贴一遍,哪天需要看看这个也比较集中:

[root@contoso ~]# cat > /etc/sysctl.conf

# sysctl settings are defined through files in

# /usr/lib/sysctl.d/, /run/sysctl.d/, and /etc/sysctl.d/.

#

# Vendors settings live in /usr/lib/sysctl.d/.

# To override a whole file, create a new file with the same in

# /etc/sysctl.d/ and put new settings there. To override

# only specific settings, add a file with a lexically later

# name in /etc/sysctl.d/ and put new settings there.

#

# For more information, see sysctl.conf(5) and sysctl.d(5).

net.ipv4.tcp_syn_retries = 1

net.ipv4.tcp_synack_retries = 1

net.ipv4.tcp_keepalive_time = 600

net.ipv4.tcp_keepalive_probes = 3

net.ipv4.tcp_keepalive_intvl = 15

net.ipv4.tcp_retries2 = 5

net.ipv4.tcp_fin_timeout = 2

net.ipv4.tcp_max_tw_buckets = 36000

net.ipv4.tcp_tw_recycle = 1

net.ipv4.tcp_tw_reuse = 1

net.ipv4.tcp_max_orphans = 32768

net.ipv4.tcp_syncookies = 1

net.ipv4.tcp_max_syn_backlog = 16384

net.ipv4.tcp_wmem = 8192 131072 16777216

net.ipv4.tcp_rmem = 32768 131072 16777216

net.ipv4.tcp_mem = 786432 1048576 1572864

net.ipv4.ip_local_port_range = 1024 65000

[root@contoso ~]# sysctl -p

[root@contoso ~]# cat > /etc/security/limits.conf

# /etc/security/limits.conf

#

#This file sets the resource limits for the users logged in via PAM.

#It does not affect resource limits of the system services.

#

#Also note that configuration files in /etc/security/limits.d directory,

#which are read in alphabetical order, override the settings in this

#file in case the domain is the same or more specific.

#That means for example that setting a limit for wildcard domain here

#can be overriden with a wildcard setting in a config file in the

#subdirectory, but a user specific setting here can be overriden only

#with a user specific setting in the subdirectory.

#

#Each line describes a limit for a user in the form:

#

#<domain> <type> <item> <value>

#

#Where:

#<domain> can be:

# - a user name

# - a group name, with @group syntax

# - the wildcard *, for default entry

# - the wildcard %, can be also used with %group syntax,

# for maxlogin limit

#

#<type> can have the two values:

# - "soft" for enforcing the soft limits

# - "hard" for enforcing hard limits

#

#<item> can be one of the following:

# - core - limits the core file size (KB)

# - data - max data size (KB)

# - fsize - maximum filesize (KB)

# - memlock - max locked-in-memory address space (KB)

# - nofile - max number of open file descriptors

# - rss - max resident set size (KB)

# - stack - max stack size (KB)

# - cpu - max CPU time (MIN)

# - nproc - max number of processes

# - as - address space limit (KB)

# - maxlogins - max number of logins for this user

# - maxsyslogins - max number of logins on the system

# - priority - the priority to run user process with

# - locks - max number of file locks the user can hold

# - sigpending - max number of pending signals

# - msgqueue - max memory used by POSIX message queues (bytes)

# - nice - max nice priority allowed to raise to values: [-20, 19]

# - rtprio - max realtime priority

#

#<domain> <type> <item> <value>

#

#* soft core 0

#* hard rss 10000

#@student hard nproc 20

#@faculty soft nproc 20

#@faculty hard nproc 50

#ftp hard nproc 0

#@student - maxlogins 4

# End of file

* soft nofile 65536

* hard nofile 65536

[root@contoso ~]#ulimit -n 65536

接下来配置httpd 服务器的最大并发数,前面已经配置过了,此处跳过配置httpd的最大并发数:

[root@contoso ~]# cat > /etc/httpd/conf.modules.d/00-mpm.conf

# Select the MPM module which should be used by uncommenting exactly

# one of the following LoadModule lines:

# prefork MPM: Implements a non-threaded, pre-forking web server

# See: /docs/2.4/mod/prefork.html

LoadModule mpm_prefork_module modules/mod_mpm_prefork.so

<IfModule mpm_prefork_module>

StartServers 100

MinSpareServers 100

MaxSpareServers 120

ServerLimit 3000

MaxClients 3000

MaxRequestsPerChild 500000

</IfModule>

# worker MPM: Multi-Processing Module implementing a hybrid

# multi-threaded multi-process web server

# See: /docs/2.4/mod/worker.html

#

#LoadModule mpm_worker_module modules/mod_mpm_worker.so

# event MPM: A variant of the worker MPM with the goal of consuming

# threads only for connections with active processing

# See: /docs/2.4/mod/event.html

#

#LoadModule mpm_event_module modules/mod_mpm_event.so

[root@contoso ~]#

创建数据库db1,初始化测试数据表

CREATE DATABASE IF NOT EXISTS `db1` DEFAULT CHARACTER SET utf8 ;

USE `db1`;

DROP TABLE IF EXISTS `think_account`;

CREATE TABLE `think_account` (

`user_id` int(4) NOT NULL,

`amount` decimal(18,2) DEFAULT NULL,

PRIMARY KEY (`user_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into `think_account`(`user_id`,`amount`) values (1,'9999851572743.00'),(3,'9999999999999.00'),(5,'9999999999999.00');

DROP TABLE IF EXISTS `think_message_supply`;

CREATE TABLE `think_message_supply` (

`msg_id` varchar(40) NOT NULL,

`user_from` int(11) DEFAULT NULL,

`amount` decimal(18,2) DEFAULT NULL,

`status` tinyint(4) DEFAULT 0,

`user_to` int(11) DEFAULT NULL,

`time` timestamp NULL DEFAULT current_timestamp(),

PRIMARY KEY (`msg_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

创建数据库db2,初始化测试数据表

CREATE DATABASE IF NOT EXISTS `db2` DEFAULT CHARACTER SET utf8 ;

USE `db2`;

DROP TABLE IF EXISTS `think_account`;

CREATE TABLE `think_account` (

`user_id` int(4) NOT NULL,

`amount` decimal(18,2) DEFAULT NULL,

PRIMARY KEY (`user_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into `think_account`(`user_id`,`amount`) values (2,'0.00'),(4,'500.00'),(6,'500.00');

DROP TABLE IF EXISTS `think_message_apply`;

CREATE TABLE `think_message_apply` (

`msg_id` varchar(40) NOT NULL,

`user_from` int(11) DEFAULT NULL,

`amount` decimal(18,2) DEFAULT NULL,

`status` tinyint(4) DEFAULT 0,

`user_to` int(11) DEFAULT NULL,

`time` timestamp NULL DEFAULT current_timestamp(),

PRIMARY KEY (`msg_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

ab工具的压力测试:

首先在主机192.168.10.20上用root账户登录,启动消费者监听,以下是启动命令:

[root@contoso ~]#cd /home/myth/www/think && php public/index.php index/Consumer/listen

测试500个并发,可能有点高了在虚拟机环境里,因为有请求丢失了(ab 工具因为socket接收数据出现错误,关闭了出现错误的请求连接,相当于这条请求没发送成功)

[myth@contoso ~]$ab -r -t 7200 -s 7200 -k -n 100000 -c 500 "/index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2"

查看一下消息服务器 队列瞬间执行的情形:

看看系统的实时性能吧

看看启动的httpd进程个数吧(没有直接统计,而是把SecureCRT终端输出的全部httpd进程贴到NotePad++里滚屏截图(FastStone Capture)打印的) 进程总的个数 = 第623行 - 第3行 + 1 = 621个 httpd进程 空闲httpd个数是100个 + 500个并发 + 默认启动的1个httpd = 刚好等于621个httpd进程

删掉默认消息服务器上的所有数据,重新创建一个与如同新装后一摸一样的默认消息服务器:

[root@rabbitmq1 ~]#rabbitmqctl delete_vhost / && rabbitmqctl add_vhost / && rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'

Deleting vhost "/"

Creating vhost "/"

Setting permissions for user "guest" in vhost "/"

[root@rabbitmq1 ~]#systemctl restart rabbitmq-server && rabbitmqadmin list exchanges

+--------------------+---------+

| name | type |

+--------------------+---------+

| | direct |

| amq.direct | direct |

| amq.fanout | fanout |

| amq.headers | headers |

| amq.match | headers |

| amq.rabbitmq.log | topic |

| amq.rabbitmq.trace | topic |

| amq.topic | topic |

+--------------------+---------+

[root@rabbitmq1 ~]#

读写文件的用户是httpd的用户apache 所以data.csv文件的读写权限是分配给apache用户的

如果你够细心runtime文件夹的读写权限也是apache 连root都无法写操作 要留意哦,PHP源码注释里我写这些东西不为了好看才写的,

有经验的你看到这个就要知道这是必须要执行终端命令

[root@contoso ~]# chown -R apache:apache /home/myth/www/think/apps/bank-data/data.csv && ll /home/myth/www/think/apps

[root@contoso ~]# chmod -R 0755 /home/myth/www/think/apps/bank-data && ll /home/myth/www/think/apps/bank-data

[root@contoso ~]# cat /home/myth/www/think/apps/bank-data/data.csv

GET /index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2

[root@rabbitmq1 ~]# rabbitmqadmin list bindings

[root@rabbitmq1 ~]# rabbitmqadmin list queues

[root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers1 requeue=true count=30

[root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers2 requeue=true count=10

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。