redis读写分离数据延迟负载均衡demo分为3个文件config配置文件input日志文件redis操作文件index入口文件
1.index入口文件
<?php
require_once "./config.php";
require_once "./redis.php";
// 在swoole事件中 echo 和 var_dump是输出在 控制台 不是浏览器
$http = new Swoole\Http\Server("0.0.0.0", 9501);
// 设置swoole进程个数
$http->set([
'worker_num' => 1
]);
// 在创建的时候执行 ; 进程创建的时候触发时候
// 理解为一个构造函数,初始化
$http->on('workerStart', function ($server, $worker_id) use($config){
global $redisMS;
$redisMS = new RedisMS($config);
});
// 通过浏览器访问 http://本机ip :9501会执行的代码
$http->on('request', function ($request, $response) {
global $redisMS;
/*
$request->get = [
'type' => '操作类型 1写,2读' ,
'method' => '操作的方法',
'params' => []
]
*/
$ret = 'p';
if ($request->get['type'] == 1) {
$ret = $redisMS->runCall($request->get['method'], explode(',', $request->get['params']));
} else {
// 读
$ret = $redisMS->runCall('get', [$request->get['params']]);
}
$response->end($ret);
});
$http->start();2.redis操作文件
<?php
require 'Input.php';
/**
* redis 基于c写的
* predis 基于php扩展
*/
class RedisMS
{
protected $config;
/**
* 记录redis连接
* [
* "master" => \\Redis,
* "slaves "=> [
* 'slaveIP1:port' => \Redis
* 'slaveIP2:port' => \Redis
* 'slaveIP3:port' => \Redis
* ]
* ]
*/
protected $connections;
/**
* [
* "0" => 'slaveIP1:port' ,
* "1" => 'slaveIP2:port',
* "2" => 'slaveIP3:port',
* ]
*/
protected $connSlaveIndexs;
protected $call = [
'write' => [
'set',
'sadd'
// ..
],
'read' => [
'get',
'smembers'
//..
],
];
public function __construct($config)
{
if ($config["is_ms"]) {
$this->connections['master'] = $this->getRedis($config['master']['host'], $config['master']['port']);
$this->createSlave($config['slaves']);
Input::info($this->connections, "这是获取的连接");
Input::info($this->connSlaveIndexs, "这是连接的下标");
$this->maintain();
}
$this->config = $config;
}
// --------------主从维护--------------------
/**
* 去维护从节点列表
*
* 重整 1台服务器,多个从节点
*/
protected function maintain()
{
/*
1. 获取主节点连接信息
2. 获取从节点的偏移量
3. 获取连接个数
3.1 偏移量的计算
3.2 维护列表
*/
$masterRedis = $this->getMaster();
swoole_timer_tick(2000, function ($timer_id) use($masterRedis){
// 得到主节点的连接信息
$replInfo = $masterRedis->info('replication');
// Input::info($replInfo, "复制信息");
// 得到主节点偏移量
$masterOffset = $replInfo['master_repl_offset'];
// 记录新增的从节点
$slaves = [];
for ($i=0; $i < $replInfo['connected_slaves']; $i++) {
// 获取slave的信息
$slaveInfo = $this->stringToArr($replInfo['slave'.$i]);
$slaveFlag = $this->redisFlag($slaveInfo['ip'], $slaveInfo['port']);
// 延迟检测
if (($masterOffset - $slaveInfo['offset']) < 100) {
// 是正常范围
// 如果之前因为网络延迟删除了节点,现在恢复了网络 -》新增
// 这是动态新增
if (!in_array($slaveFlag, $this->connSlaveIndexs)) {
$slaves[$slaveFlag] = [
'host' => $slaveInfo['ip'],
'port' => $slaveInfo['port']
];
Input::info($slaveFlag, "新增从节点");
}
} else {
// 延迟 -> 删除节点
Input::info($slaveFlag, "删除节点");
unset($this->connections['slaves'][$slaveFlag]);
}
}
$this->createSlave($slaves);
});
}
protected function stringToArr($str, $flag1 = ',', $flag2 = '=')
{
// "ip=192.160.1.130,port=6379,state=online,offset=72574,lag=0"
$arr = explode($flag1, $str);
$ret = [];
// $key ip
// $value 192.160.1.130
foreach ($arr as $key => $value) {
$arr2 = explode($flag2, $value);
$ret[$arr2[0]] = $arr2[1];
}
return $ret;
}
// --------------创建主从连接--------------------
/**
* $slaves = [
* 'slave1' => [
* 'host' => '192.160.1.130',
* 'port' => 6379
* ],
* 'slave2' => [
* 'host' => '192.160.1.140',
* 'port' => 6379
* ]
* ]
*
*/
private function createSlave($slaves)
{
// var_dump($slaves);
// 这个是用于做负载的时候选择从节点对象
foreach ($slaves as $key => $slave) {
$this->connections['slaves'][$this->redisFlag($slave['host'], $slave['port'])] = $this->getRedis($slave['host'], $slave['port']);
}
// 记录从节点的下标
$this->connSlaveIndexs = array_keys($this->connections['slaves']);
}
private function redisFlag($host, $port)
{
return $host.":".$port;
}
public function getRedis($host, $port)
{
$redis = new \Redis();
$redis->pconnect($host, $port);
return $redis;
}
public function getConnSlaveIndexs()
{
return $this->connSlaveIndexs;
}
// --------------获取主从连接方法--------------------
public function getMaster()
{
return $this->connections['master'];
}
public function getSlaves()
{
return $this->connections['slaves'];
}
public function oneSlave()
{
$indexs = $this->connSlaveIndexs;
$i = mt_rand(0, count($indexs) - 1);
return $this->connections['slaves'][$indexs[$i]];
// $slaves = $this->getSlaves();
// // 对于所有从节点 负载均衡算法
// $i = mt_rand(0, count($slaves) - 1);
// return $slaves[$i];
}
// --------------执行命令方法--------------------
public function runCall($command, $params = [])
{
try {
if ($this->config['is_ms']) {
// 获取操作的对象(是主还是从)
$redis = $this->getRedisCall($command);
// var_dump($redis);
return $redis->{$command}(...$params);
}
} catch (\Exception $e) {}
}
/**
* 判断操作类型
* @param [type] $command [description]
* @return boolean [description]
*/
protected function getRedisCall($command)
{
if (in_array($command, $this->call['write'])) {
return $this->getMaster();
} else if (in_array($command, $this->call['read'])){
return $this->oneSlave();
} else {
throw new \Exception("不支持");
}
}
}
/*
1. 主节点连接,从节点
2. 对于主节点连接和从节点
3. 写命令 -》
3.1 -》 判断类型
3.2 -》 主
3.3 -》 从(从是有多个节点)
3.3.4 -》 负载均衡(随机)
*/3.input日志文件
<?php
class Input
{
public static function info($message, $description = null)
{
echo "======>>> ".$description." start\n";
if (\is_array($message)) {
echo \var_export($message, true);
} else if (\is_string($message)) {
echo $message."\n";
} else {
var_dump($message);
}
echo "======>>> ".$description." end\n";
}
}4.conf配置文件
<?php $config = [ // 做个判断是否开启主从 'is_ms' => true, 'master' => [ 'host' => '192.160.1.150', 'port' => 6379 ], 'slaves' => [ 'slave1' => [ 'host' => '192.160.1.130', 'port' => 6379 ], 'slave2' => [ 'host' => '192.160.1.140', 'port' => 6379 ] ], ];
本文由:xiaoshu168.com 作者:xiaoshu发表,转载请注明来源!