Emqttd1:192.168.88.61 Emqttd2:192.168.88.62 Emqttd3:192.168.88.63 Redis:192.168.88.64 Haproxy:192.168.88.65
系统优化配置:
ulimit -n 1048576
sysctl -w fs.file-max=2097152
sysctl -w fs.nr_open=2097152
sysctl -w net.core.somaxconn=65535
一、解压、安装、修改配置:
$ unzip emqttd-centos6.8-v2.2.0.zip
$ cd emqttd
$ vi etc/emq.conf
#node.name
node.name =
## Cookie for distributed node
node.cookie = emq_dist_cookie_533d99ckd9ji475
## Erlang Process Limit
node.process_limit = 2000000
## Sets the maximum number of simultaneously existing ports for this system
node.max_ports = 1000000
## Size of acceptor pool
mqtt.listener.tcp.acceptors = 64
## Maximum number of concurrent clients
mqtt.listener.tcp.max_clients = 1000000
## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
## mqtt.listener.tcp.rate_limit = 100,10
## TCP Socket Options
mqtt.listener.tcp.backlog = 262144
## Distributed node port range
node.dist_listen_min = 6000
node.dist_listen_max = 6999
如果需要启用防火墙,则上面两行去掉注释,注意下面的防火墙端口设置,要打开该段端口。
端口:1883:MQTT协议tcp端口,8883:MQTT(SSL) tcp端口,8083:MTQQ(websocket)、HTTP API端口,18083:dashboard管理控制WEB端口,4369:集群处理epmd端口,6000-6999由上面配置文件定义的epmd需要的端口范围。
启动:
$ cd emqttd
直接进入控制台模式
$ ./bin/emqttd console
$ ./bin/emqttd start (启动)
$ ./bin/emqttd_ctl status (查看状态)
$ ./bin/emqttd stop (停止)
查看各台的启动状态:
http:192.168.88.61:8083/status
http:192.168.88.61:18083/ user/password:admin/public
二、把节点加入集群:
在各个节点上执行(重复执行也没关系):
$ ./bin/emqttd_ctl cluster join
$ ./bin/emqttd_ctl cluster status
Cluster status:
'node1@192.168.88.61',
'node2@192.168.88.62']}]
把节点退出集群:
本机退出集群:
$ ./bin/emqttd_ctl cluster leave
把某节点退出集群:
$ ./bin/emqttd_clt cluster remove
三、测试:
下载安装MTQQ协议的客户端:
$ sudo rpm -ivh libmosquitto1-1.4.10-1.1.x86_64.rpm
$ sudo rpm -ivh mosquitto-clients-1.4.10-1.1.x86_64.rpm
订阅:
$ mosquitto_sub -h 192.168.88.62 -p 1883 -t test_topic_1 -q 1 -c -i 1111
带用户名和密码:
$ mosquitto_sub -h 192.168.88.62 -p 1883 -t test_topic_1 -q 1 -c -i 1111 -u user -P 123456
发布:另开一个ssh或者另外机器上执行:
$ mosquitto_pub -h 192.168.88.62 -p 1883 -t test_topic_1 -q 1 -m "hello mqtt 7"
带用户名和密码:
$ mosquitto_pub -h 192.168.88.62 -p 1883 -t test_topic_1 -q 1 -m "hello mqtt 7" -u user -P 123456
参数:
-h: 连接到哪台broker。
-p: 连接端口。
-t: topic名字,topic类似文件系统的组织方式,以"/"分隔符来分层,订阅者订阅时可以使用通配符"+"和"#",发布者不能使用通配符。
-q: qos级别,默认为0,共三个值:0:至多一次,不保证到达订阅者;1:至少一次,保证到达订阅者,但不保证不重复;2:正好一次,保证到达,又保证不重复;非钱类的应用使用qos1就可以了。
-c: 如果订阅者退出,在broker保留所有订阅的消息,一旦重新连接上,则把所有消息发给订阅者,就是持久性订阅(clean_sess=false)。
-i: 设置client id,在即时通信时可以设置为用户id或者用户名等具有唯一性的字段。
-r: 发布方使用的参数,保留消息(每个topic只保留最后一个这种消息,之前的会覆盖 ),即使该消息被一个订阅者读取了,还会一直保留在broker,如果有新的订阅者订阅该topic,则马上会收到该消息,类似qq里面的公告性消息,这类信息的删除,发送一个payload为空的null消息即可:$ mosquitto_pub -h 192.168.88.62 -p 1883 -t test_topic_1 -q 1 -r -n -u user -P 123456。
-m: 该topic的一条消息内容。
四、Redis 插件认证(先要安装redis)
Redis 认证。MQTT 用户记录存储在 Redis Hash, 键值: “mqtt_user:<Username>”
vi etc/plugins/emq_auth_redis.conf 设置 ‘super_cmd’、’auth_cmd’、’password_hash’:
## Redis Serverauth.redis.server = 127.0.0.1:6379
## Redis Pool Sizeauth.redis.pool = 8
## Redis Databaseauth.redis.database = 0
## Redis Password## auth.redis.password =
## Variables: %u = username, %c = clientid
## Authentication Query Commandauth.redis.auth_cmd = HGET mqtt_user:%u password
## Password hash: plain, md5, sha, sha256, pbkdf2auth.redis.password_hash = sha256
## Superuser Query Commandauth.redis.super_cmd = HGET mqtt_user:%u is_superuser
启用 Redis 认证插件:
./bin/emqttd_ctl plugins load emq_auth_redis
五、MySQL 插件认证
通过 MySQL 数据库表认证,可创建如下的 ‘mqtt_user’ 表:
CREATE TABLE `mqtt_user` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`username` varchar(100) DEFAULT NULL,
`password` varchar(100) DEFAULT NULL,
`salt` varchar(20) DEFAULT NULL,
`is_superuser` tinyint(1) DEFAULT 0,
`created` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_username` (`username`)) ENGINE=MyISAM DEFAULT CHARSET=utf8;
vi etc/plugins/emq_auth_mysql.conf 配置 ‘super_query’, ‘auth_query’, ‘password_hash’:
## Mysql Serverauth.mysql.server = 127.0.0.1:3306
## Mysql Pool Sizeauth.mysql.pool = 8
## Mysql Username## auth.mysql.username =
## Mysql Password## auth.mysql.password =
## Mysql Databaseauth.mysql.database = mqtt
## Variables: %u = username, %c = clientid
## Authentication Query: select password onlyauth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1
## Password hash: plain, md5, sha, sha256, pbkdf2auth.mysql.password_hash = sha256
## %% Superuser Queryauth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1
注解
如果系统已有MQTT认证表,可通过配置’auth_query’查询语句集成。
启用 MySQL 认证插件:
./bin/emqttd_ctl plugins load emq_auth_mysql
六、使用Haproxy来实现负载分担:
因为emq的topic和消息在集群的各台服务器上一致,所以数据不能以增加机器的方式扩容,只能增加每台的内存,和客户端的连接则可以以增加机器方式扩容。
软件安装
[root@cacti src]# tar xzvf haproxy-1.4.24.tar.gz
[root@cacti haproxy-1.4.24]# make TARGET=linux26 PREFIX=/usr/local/haproxy
[root@cacti haproxy-1.4.24]# make install PREFIX=/usr/local/haproxy
配置文件:vi haproxy.cfg
global
log 127.0.0.1 local3
maxconn 20480
chroot /usr/local/haproxy
uid 1004
gid 1004
daemon
quiet
nbproc 1
pidfile /var/run/haproxy.pid
defaults
log global
mode http
maxconn 20480
option httplog
option httpclose
option forwardfor
option dontlognull
option redispatch
retries 3
balance roundrobin
contimeout 5000
clitimeout 50000
srvtimeout 50000
listen emq_emqttd *:1883
mode tcp
balance source
log global
timeout connect 25s
timeout client 0
timeout server 0
option tcpka
option tcplog
#option tcp-check
server emq1 192.168.88.61:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25
server emq2 192.168.88.62:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25
server emq3 192.168.88.63:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25
listen status 192.168.88.65:8080
stats enable
stats uri /stats
stats auth admin:123456
stats realm (Haproxy\ statistic)
创建haproxy启动脚本:
#!/bin/bash
#
# haproxy
#
# chkconfig: 35 85 15
# description: HAProxy is a free, very fast and reliable solution \
# offering high availability, load balancing, and \
# proxying for TCP and HTTP-based applications
# processname: haproxy
# config: /etc/haproxy.cfg
# pidfile: /var/run/haproxy.pid
# Source function library.
. /etc/rc.d/init.d/functions
# Source networking configuration.
. /etc/sysconfig/network
# Check that networking is up.
[ "$NETWORKING" = "no" ] && exit 0
config="/etc/haproxy.cfg"
exec="/usr/local/haproxy/sbin/haproxy"
prog=$(basename $exec)
[ -e /etc/sysconfig/$prog ] && . /etc/sysconfig/$prog
lockfile=/var/lock/subsys/haproxy
check() {
$exec -c -V -f $config
}
start() {
$exec -c -q -f $config
if [ $? -ne 0 ]; then
echo "Errors in configuration file, check with $prog check."
return 1
fi
echo -n $"Starting $prog: "
# start it up here, usually something like "daemon $exec"
daemon $exec -D -f $config -p /var/run/$prog.pid
retval=$?
echo
[ $retval -eq 0 ] && touch $lockfile
return $retval
}
stop() {
echo -n $"Stopping $prog: "
# stop it here, often "killproc $prog"
killproc $prog
retval=$?
echo
[ $retval -eq 0 ] && rm -f $lockfile
return $retval
}
restart() {
$exec -c -q -f $config
if [ $? -ne 0 ]; then
echo "Errors in configuration file, check with $prog check."
return 1
fi
stop
start
}
reload() {
$exec -c -q -f $config
if [ $? -ne 0 ]; then
echo "Errors in configuration file, check with $prog check."
return 1
fi
echo -n $"Reloading $prog: "
$exec -D -f $config -p /var/run/$prog.pid -sf $(cat /var/run/$prog.pid)
retval=$?
echo
return $retval
}
force_reload() {
restart
}
fdr_status() {
status $prog
}
case "$1" in
start|stop|restart|reload)
$1
;;
force-reload)
force_reload
;;
checkconfig)
check
;;
status)
fdr_status
;;
condrestart|try-restart)
[ ! -f $lockfile ] || restart
;;
*)
echo $"Usage: $0 {start|stop|status|checkconfig|restart|try-restart|reload|force-reload}"
exit 2
esac
启动
[root@cacti etc]# /etc/init.d/haproxy start
Starting haproxy: [ OK ]
[root@cacti etc]# ps -ef | grep haproxy
100 2305 1 0 17:55 ? 00:00:00 /usr/local/sbin/haproxy -D -f /etc/haproxy.cfg -p /var/run/haproxy.pid
root 2308 774 0 17:55 pts/0 00:00:00 grep haproxy
如果没写启动脚本,可以用下方式启动:
/usr/local/haproxy/sbin/haproxy –f /usr/local/haproxy/etc/haproxy.cft