EasySwoole RPC
很多傳統的 Phper
并不懂 RPC
是什么,RPC
全稱 Remote Procedure Call
,中文譯為 遠程過程調用
,其實你可以把它理解為是一種架構性上的設計,或者是一種解決方案。
例如在某龐大商場系統中,你可以把整個商場拆分為 N
個微服務(理解為 N
個獨立的小模塊也行),例如:
- 訂單系統
- 用戶管理系統
- 商品管理系統
- 等等
那么在這樣的架構中,就會存在一個 API 網關
的概念,或者是叫 服務集成者
。我的 API 網關
的職責,就是把一個請求,拆分成 N
個小請求,分發到各個小服務里面,再整合各個小服務的結果,返回給用戶。例如在某次下單請求中,那么大概發送的邏輯如下:
- API 網關接受請求
- API 網關提取用戶參數,請求用戶管理系統,獲取用戶余額等信息,等待結果
- API 網關提取商品參數,請求商品管理系統,獲取商品剩余庫存和價格等信息,等待結果
- API 網關融合用戶管理系統、商品管理系統的返回結果,進行下一步調用(假設滿足購買條件)
- API 網關調用用戶管理信息系統進行扣款,調用商品管理系統進行庫存扣減,調用訂單系統進行下單(事務邏輯和撤回可以用
請求 id
保證,或者自己實現其他邏輯調度) - API 網關返回綜合信息給用戶
而在以上發生的行為,就稱為 遠程過程調用
。而調用過程實現的通訊協議可以有很多,比如常見的 HTTP
協議。而 EasySwoole RPC
采用自定義短鏈接的 TCP
協議實現,每個請求包,都是一個 JSON
,從而方便實現跨平臺調用。
全新特性
- 協程調度
- 服務自動發現
- 服務熔斷
- 服務降級
- Openssl 加密
- 跨平臺、跨語言支持
- 支持接入第三方注冊中心
安裝
composer require easyswoole/rpc=4.x
執行流程
服務端:
注冊RPC服務,創建相應的服務swoole table表(ps:記錄調用成功和失敗的次數)
注冊worker,tick進程
woker進程監聽:
客戶端發送請求->解包成相對應的格式->執行對應的服務->返回結果->客戶端
tick進程:
注冊定時器發送心跳包到本節點管理器
啟用廣播:每隔幾秒發送本節點各個服務信息到其他節點
啟用監聽:監聽其他節點發送的信息,發送相對應的命令(心跳|下線)到節點管理器處理
進程關閉:主動刪除本節點的信息,發送下線廣播到其他節點
Rpc-Server
場景
例如在一個商場系統中,我們將商品庫和系統公告兩個服務切分開到不同的服務器當中。當用戶打開商場首頁的時候, 我們希望App向某個網關發起請求,該網關可以自動的幫我們請求商品列表和系統公共等數據,合并返回。
服務定義
每一個Rpc服務其實就一個EasySwoole\Rpc\AbstractService類。 如下:
定義商品服務
namespace App\RpcService;
use EasySwoole\Rpc\AbstractService;
class Goods extends AbstractService
{
/**
* 重寫onRequest(比如可以對方法做ip攔截或其它前置操作)
*
* @param string $action
* @return bool
* CreateTime: 2020/6/20 下午11:12
*/
protected function onRequest(?string $action): ?bool
{
return true;
}
public function serviceName(): string
{
return 'goods';
}
public function list()
{
$this->response()->setResult([
[
'goodsId'=>'100001',
'goodsName'=>'商品1',
'prices'=>1124
],
[
'goodsId'=>'100002',
'goodsName'=>'商品2',
'prices'=>599
]
]);
$this->response()->setMsg('get goods list success');
}
}
定義公共服務
namespace App\RpcService;
use EasySwoole\Rpc\AbstractService;
class Common extends AbstractService
{
public function serviceName(): string
{
return 'common';
}
public function mailBox()
{
$this->response()->setResult([
[
'mailId'=>'100001',
'mailTitle'=>'系統消息1',
],
[
'mailId'=>'100001',
'mailTitle'=>'系統消息1',
],
]);
$this->response()->setMsg('get mail list success');
}
public function serverTime()
{
$this->response()->setResult(time());
$this->response()->setMsg('get server time success');
}
}
服務注冊
在Easyswoole
全局的Event
文件中,進行服務注冊。至于節點管理、服務類定義等具體用法請看對應章節。
namespace EasySwoole\EasySwoole;
use App\RpcService\Common;
use App\RpcService\Goods;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
use EasySwoole\Redis\Config\RedisConfig;
use EasySwoole\RedisPool\RedisPool;
use EasySwoole\Rpc\NodeManager\RedisManager;
use EasySwoole\Rpc\Config as RpcConfig;
use EasySwoole\Rpc\Rpc;
class EasySwooleEvent implements Event
{
public static function initialize()
{
// TODO: Implement initialize() method.
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
/*
* 定義節點Redis管理器
*/
$redisPool = new RedisPool(new RedisConfig([
'host'=>'127.0.0.1'
]));
$manager = new RedisManager($redisPool);
//配置Rpc實例
$config = new RpcConfig();
//這邊用于指定當前服務節點ip,如果不指定,則默認用UDP廣播得到的地址
$config->setServerIp('127.0.0.1');
$config->setNodeManager($manager);
/*
* 配置初始化
*/
Rpc::getInstance($config);
//添加服務
Rpc::getInstance()->add(new Goods());
Rpc::getInstance()->add(new Common());
Rpc::getInstance()->attachToServer(ServerManager::getInstance()->getSwooleServer());
}
public static function onRequest(Request $request, Response $response): bool
{
// TODO: Implement onRequest() method.
return true;
}
public static function afterRequest(Request $request, Response $response): void
{
// TODO: Implement afterAction() method.
}
}
為了方便測試,我把兩個服務放在同一臺機器中注冊。實際生產場景應該是N臺機注冊商品服務,N臺機器注冊公告服務,把服務分開。
Rpc-Client
控制器聚合調用
namespace App\HttpController;
use EasySwoole\Http\AbstractInterface\Controller;
use EasySwoole\Rpc\Response;
use EasySwoole\Rpc\Rpc;
class Index extends Controller
{
function index()
{
$ret = [];
$client = Rpc::getInstance()->client();
/*
* 調用商品列表
*/
$client->addCall('goods','list',['page'=>1])
->setOnSuccess(function (Response $response)use(&$ret){
$ret['goods'] = $response->toArray();
})->setOnFail(function (Response $response)use(&$ret){
$ret['goods'] = $response->toArray();
});
/*
* 調用信箱公共
*/
$client->addCall('common','mailBox')
->setOnSuccess(function (Response $response)use(&$ret){
$ret['mailBox'] = $response->toArray();
})->setOnFail(function (Response $response)use(&$ret){
$ret['mailBox'] = $response->toArray();
});
/*
* 獲取系統時間
*/
$client->addCall('common','serverTime')
->setOnSuccess(function (Response $response)use(&$ret){
$ret['serverTime'] = $response->toArray();
});
$client->exec(2.0);
$this->writeJson(200,$ret);
}
}
注意,控制器中可以這樣調用,是因為服務端章節中,在EasySwoole的全局啟動事件已經對當前的Rpc實例定義注冊了節點管理器。因此在控制器中調用的時候 該Rpc實例可以找到對應的節點。一般來說,在做聚合網關的節點,是不需要注冊服務進去的,僅需注冊節點管理器即可。
客戶端
當rpc服務和客戶端不在同一服務中時,并且服務端客戶端使用的都是es
<?php
require_once 'vendor/autoload.php';
use EasySwoole\Rpc\Config;
use EasySwoole\Rpc\Rpc;
use EasySwoole\Rpc\NodeManager\RedisManager;
use EasySwoole\Rpc\Response;
$redisConfig = new \EasySwoole\Redis\Config\RedisConfig();
$redisConfig->setHost('127.0.0.1'); // 服務端使用的redis節點地址
$redisConfig->setPort('6379'); // 服務端使用的redis節點端口
$pool=new \EasySwoole\RedisPool\RedisPool($redisConfig);
$config = new Config();
$config->setServerIp('127.0.0.1'); // 指定rpc服務地址
$config->setListenPort(9502); // 指定rpc服務端口
$config->setNodeManager(new RedisManager($pool));
$rpc = new Rpc($config);
\Swoole\Coroutine::create(function () use ($rpc) {
$client = $rpc->client();
$client->addCall('UserService', 'register', ['arg1', 'arg2'])
->setOnFail(function (Response $response) {
print_r($response->toArray());
})
->setOnSuccess(function (Response $response) {
print_r($response->toArray());
});
$client->exec();
});
swoole_timer_clear_all();
跨平臺
Rpc
的請求響應通過tcp
協議,服務廣播使用udp
協議,我們只需要實現網絡協議即可。
PHP示例代碼
<?php
/**
* Created by PhpStorm.
* User: xcg
* Date: 2019/6/17
* Time: 14:30
*/
$data = [
'command' => 1,//1:請求,2:狀態rpc 各個服務的狀態
'request' => [
'serviceName' => 'UserService',
'action' => 'register',//行為名稱
'arg' => [
'args1' => 'args1',
'args2' => 'args2'
]
]
];
//$raw = serialize($data);//注意序列化類型,需要和RPC服務端約定好協議 $serializeType
$raw = json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
$fp = stream_socket_client('tcp://127.0.0.1:9600');
fwrite($fp, pack('N', strlen($raw)) . $raw);//pack數據校驗
$data = fread($fp, 65533);
//做長度頭部校驗
$len = unpack('N', $data);
$data = substr($data, '4');
if (strlen($data) != $len[1]) {
echo 'data error';
} else {
$data = json_decode($data, true);
// //這就是服務端返回的結果,
var_dump($data);//默認將返回一個response對象 通過$serializeType修改
}
fclose($fp);
Go示例代碼
package main
import (
"encoding/binary"
"net"
)
func main() {
var tcpAddr *net.TCPAddr
tcpAddr,_ = net.ResolveTCPAddr("tcp","127.0.0.1:9600")
conn,_ := net.DialTCP("tcp",nil,tcpAddr)
defer conn.Close()
sendEasyswooleMsg(conn)
}
func sendEasyswooleMsg(conn *net.TCPConn) {
var sendData []byte
data := `{"command":1,"request":{"serviceName":"UserService","action":"register","arg":{"args1":"args1","args2":"args2"}}}`
b := []byte(data)
// 大端字節序(網絡字節序)大端就是將高位字節放到內存的低地址端,低位字節放到高地址端。
// 網絡傳輸中(比如TCP/IP)低地址端(高位字節)放在流的開始,對于2個字節的字符串(AB),傳輸順序為:A(0-7bit)、B(8-15bit)。
sendData = int32ToBytes8(int32(len(data)))
// 將數據byte拼裝到sendData的后面
for _, value := range b {
sendData = append(sendData, value)
}
conn.Write(sendData)
}
func int32ToBytes8(n int32) []byte {
var buf = make([]byte, 4)
binary.BigEndian.PutUint32(buf, uint32(n))
return buf
}
Java
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
public class Main {
public static void main(String[] args) throws IOException {
byte[] msg = "{\"command\":1,\"request\":{\"serviceName\":\"UserService\",\"action\":\"register\",\"arg\":{\"args1\":\"args1\",\"args2\":\"args2\"}}}".getBytes();
byte[] head = Main.toLH(msg.length);
byte[] data = Main.mergeByteArr(head, msg);
//創建Socket對象,連接服務器
Socket socket=new Socket("127.0.0.1",9600);
//通過客戶端的套接字對象Socket方法,獲取字節輸出流,將數據寫向服務器
OutputStream out=socket.getOutputStream();
out.write(data);
//讀取服務器發回的數據,使用socket套接字對象中的字節輸入流
InputStream in=socket.getInputStream();
byte[] response=new byte[1024];
int len=in.read(response);
System.out.println(new String(response,4, len-4));
socket.close();
}
static byte[] toLH(int n) {
byte[] b = new byte[4];
b[3] = (byte) (n & 0xff);
b[2] = (byte) (n >> 8 & 0xff);
b[1] = (byte) (n >> 16 & 0xff);
b[0] = (byte) (n >> 24 & 0xff);
return b;
}
static byte[] mergeByteArr(byte[] a, byte[] b) {
byte[] c= new byte[a.length + b.length];
System.arraycopy(a, 0, c, 0, a.length);
System.arraycopy(b, 0, c, a.length, b.length);
return c;
}
}
其他語言只需要實現tcp協議即可
EasySwoole RPC 自定義注冊中心
EasySwoole
默認為通過UDP
廣播+自定義進程定時刷新自身節點信息的方式來實現無主化/注冊中心的服務發現。在服務正常關閉的時候,自定義定時進程的onShutdown
方法會執行deleteServiceNode
方法來實現節點下線。在非正常關閉的時候,心跳超時也會被節點管理器踢出。
有些情況,不方便用UDP
廣播的情況下,那么EasySwoole
支持你自定義一個節點管理器,來變更服務發現方式。
例如用Redis來實現
namespace EasySwoole\Rpc\NodeManager;
use EasySwoole\RedisPool\RedisPool;
use EasySwoole\Rpc\ServiceNode;
use EasySwoole\Utility\Random;
class RedisManager implements NodeManagerInterface
{
protected $redisKey;
protected $pool;
function __construct(RedisPool $pool, string $hashKey = 'rpc')
{
$this->redisKey = $hashKey;
$this->pool = $pool;
}
function getServiceNodes(string $serviceName, ?string $version = null): array
{
$redis = $this->pool->getObj(15);
try {
$nodes = $redis->hGetAll("{$this->redisKey}_{$serviceName}");
$nodes = $nodes ?: [];
$ret = [];
foreach ($nodes as $nodeId => $node) {
$node = new ServiceNode(json_decode($node,true));
/**
* @var $nodeId
* @var ServiceNode $node
*/
if (time() - $node->getLastHeartBeat() > 30) {
$this->deleteServiceNode($node);
}
if ($version && $version != $node->getServiceVersion()) {
continue;
}
$ret[$nodeId] = $node;
}
return $ret;
} catch (\Throwable $throwable) {
//如果該redis斷線則銷毀
$this->pool->unsetObj($redis);
} finally {
$this->pool->recycleObj($redis);
}
return [];
}
function getServiceNode(string $serviceName, ?string $version = null): ?ServiceNode
{
$list = $this->getServiceNodes($serviceName, $version);
if (empty($list)) {
return null;
}
return Random::arrayRandOne($list);
}
function deleteServiceNode(ServiceNode $serviceNode): bool
{
$redis = $this->pool->getObj(15);
try {
$redis->hDel($this->generateNodeKey($serviceNode), $serviceNode->getNodeId());
return true;
} catch (\Throwable $throwable) {
$this->pool->unsetObj($redis);
} finally {
$this->pool->recycleObj($redis);
}
return false;
}
function serviceNodeHeartBeat(ServiceNode $serviceNode): bool
{
if (empty($serviceNode->getLastHeartBeat())) {
$serviceNode->setLastHeartBeat(time());
}
$redis = $this->pool->getObj(15);
try {
$redis->hSet($this->generateNodeKey($serviceNode), $serviceNode->getNodeId(), $serviceNode->__toString());
return true;
} catch (\Throwable $throwable) {
$this->pool->unsetObj($redis);
} finally {
//這邊需要測試一個對象被unset后是否還能被回收
$this->pool->recycleObj($redis);
}
return false;
}
protected function generateNodeKey(ServiceNode $node)
{
return "{$this->redisKey}_{$node->getServiceName()}";
}
}
即使關閉了UDP
定時廣,EasySwoole Rpc
的tick
進程依舊會每3秒執行一次serviceNodeHeartBeat
用于更新自身的節點心跳信息。