Thrift Thrift 实战 服务调用结构图
创建项目文件目录 match_system、game、thrift
和 readme.md
文件
thrift
目录用于存放 远程调用接口文件
match_system
:是匹配系统
game
:是游戏系统
1、在 thrift
下编写 match.thrift
文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 namespace cpp match_service struct User { 1: i32 id, 2: string name, 3: i32 score } service Match { i32 add_user(1: User user, 2: string info), i32 remove_user(1: User user, 2: string info), }
2、在 match_system/src
下通过 match.thrift
文件生成 C++ 服务端客户端代码,但我们只用服务端
1 2 thrift -r --gen cpp ../../thrift/match.thrift
生成的代码目录
1 2 3 4 5 6 7 8 9 acs@a7e3435d46dd:~/homework/lesson_6/thrift_lesson/match_system/src$ tree . . `-- gen-cpp |-- Match.cpp |-- Match.h |-- Match_server.skeleton.cpp |-- match_types.cpp `-- match_types.h
gen-cpp
文件夹改名为 match_server
作为匹配系统的 server 端
把生成的服务端代码 Match_server.skeleton.cpp
移动到 src
目录下并改名为 main.cpp
3、修改 main.cpp
写自己的代码逻辑,Match Server 1.0
先给两个函数添加 return 0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 // This autogenerated skeleton file illustrates how to build a server. // You should copy it to another filename to avoid overwriting it. #include "match_server/Match.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <iostream> using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using namespace ::match_service; using namespace std; class MatchHandler : virtual public MatchIf { public: MatchHandler() { // Your initialization goes here } int32_t add_user(const User& user, const std::string& info) { // Your implementation goes here printf("add_user\n"); return 0; } int32_t remove_user(const User& user, const std::string& info) { // Your implementation goes here printf("remove_user\n"); return 0; } }; int main(int argc, char **argv) { int port = 9090; ::std::shared_ptr<MatchHandler> handler(new MatchHandler()); ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler)); ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port)); ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory); cout << "Start Match Server" << endl; server.serve(); return 0; }
编译测试
编译: g+t -c main.cpp match server/*.cpp
,编译所有的 .cpp
文件
链接: g++ *.o -o main -lthrift
,将所有 .o
文件链接起来,-lthrift
是 thrift 的动态链接库
小技巧 :写 thrift 代码先编译跑通环境,再逐步往里添加模块
4、进入 game/src
目录生成 python 服务端客户端代码,只用客户端
1 2 thrift -r --gen py ../../thrift/match.thrift
生成的代码目录
1 2 3 4 5 6 7 8 9 . |-- __init__.py `-- match |-- Match-remote # python 服务器端代码 |-- Match.py # python 客户端代码 |-- __init__.py |-- constants.py `-- ttypes.py
gen-cpp
文件夹改名为 match_client
作为匹配系统的 client 端
Match-remote
用不到可以删掉
创建客户端逻辑文件 client.py
,从官网 Tutorial 中复制修改。修改成从终端读入操作,客户端搞定!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 from match_client.match import Match from match_client.match.ttypes import User from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol # 从终端读入内容 from sys import stdin def operate(op, user_id, username, score): # Make socket transport = TSocket.TSocket('localhost', 9090) # Buffering is critical. Raw sockets are very slow transport = TTransport.TBufferedTransport(transport) # Wrap in a protocol protocol = TBinaryProtocol.TBinaryProtocol(transport) # Create a client to use the protocol encoder client = Match.Client(protocol) # Connect! transport.open() user = User(user_id, username, score) if op == "add": client.add_user(user, "") elif op == "remove": client.remove_user(user, "") # Close! transport.close() def main(): # 从终端中读入操作 for line in stdin: op, user_id, username, score = line.split(' ') operate(op, int(user_id), username, int(score)) if __name__ == "__main__": main()
4、完善 main.cpp
,Match Server 2.0,开一个线程去执行匹配任务,同时主线程去和客户端通信。
需要用到锁、条件变量来进行线程同步
匹配逻辑:每次取匹配池里的前两名玩家进行匹配。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 // This autogenerated skeleton file illustrates how to build a server. // You should copy it to another filename to avoid overwriting it. #include "match_server/Match.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <vector> using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using namespace ::match_service; using namespace std; struct Task { User user; string type; }; struct MessageQueue { queue<Task> q; mutex m; condition_variable cv; }message_queue; class Pool { public: void save_result(int a, int b) { printf("Match Result: % d %d\n", a, b); } void match() { while (users.size() > 1) { auto a = users[0], b = users[1]; users.erase(users.begin()); users.erase(users.begin()); save_result(a.id, b.id); } } void add(User user) { users.push_back(user); } void remove(User user) { for (uint32_t i = 0; i < users.size(); i ++ ) if (users[i].id == user.id) { users.erase(users.begin() + i); break; } } private: vector<User> users; }pool; class MatchHandler : virtual public MatchIf { public: MatchHandler() { // Your initialization goes here } int32_t add_user(const User& user, const std::string& info) { // Your implementation goes here printf("add_user\n"); unique_lock<mutex> lck(message_queue.m); // 为消息队列加锁,函数退出会自动解锁 message_queue.q.push({user, "add"}); message_queue.cv.notify_all(); // 唤醒所有被阻塞的线程 return 0; } int32_t remove_user(const User& user, const std::string& info) { // Your implementation goes here printf("remove_user\n"); unique_lock<mutex> lck(message_queue.m); // 加锁,两个方法同时只能有一个方法可以获取到消息队列的锁 message_queue.q.push({user, "remove"}); message_queue.cv.notify_all(); // 删除也是一个任务,也需要唤醒所有被阻塞的线程 return 0; } }; void consume_task() { while (true) { unique_lock<mutex> lck(message_queue.m); if (message_queue.q.empty()) { message_queue.cv.wait(lck); // 阻塞当前线程,等待唤醒 } else { auto task = message_queue.q.front(); message_queue.q.pop(); lck.unlock(); // 解锁 // do task if (task.type == "add") pool.add(task.user); else if (task.type == "remove") pool.remove(task.user); pool.match(); } } } int main(int argc, char **argv) { int port = 9090; ::std::shared_ptr<MatchHandler> handler(new MatchHandler()); ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler)); ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port)); ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); cout << "Start Match Server" << endl; // 开一个线程执行匹配任务 thread matching_thread(consume_task); TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory); server.serve(); return 0; }
我们只修改了 main.cpp
,所以只需要编译 main.cpp 即可
5、在 match_server 端调用远程保存数据服务,所以需要创建保存数据客户端 save_client
在 thrift
目录下定义 save.thrift
接口
1 2 3 4 5 6 7 8 9 10 namespace cpp save_service service Save { # username: myserver的名称 # password: myserver的密码的md5sum的前8位 # 用户名密码验证成功会返回0,验证失败会返回1 # 验证成功后,结果会被保存到myserver:homework/lesson_6/result.txt中 i32 save_data(1: string username, 2: string password, 3: i32 player1_id, 4: i32 player2_id) }
获取密码
首先 homework 4 getinfo
,获取用户名 ip 地址和密码
然后输入 md5sum
命令,输入密码,按 Ctrl + d
生成加密后的密码,取前 8 位。
match_server 端
match_client 端
myserver 端
main.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 // This autogenerated skeleton file illustrates how to build a server. // You should copy it to another filename to avoid overwriting it. #include "match_server/Match.h" #include "save_client/Save.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TTransportUtils.h> #include <thrift/transport/TSocket.h> #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <vector> using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using namespace ::match_service; using namespace ::save_service; using namespace std; struct Task { User user; string type; }; struct MessageQueue { queue<Task> q; mutex m; condition_variable cv; }message_queue; // 玩家池 class Pool { public: void save_result(int a, int b) { printf("Match Result: %d %d\n", a, b); // myserver 的 ip 地址 std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090)); std::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); SaveClient client(protocol); try { transport->open(); // 通过 myserver 用户名和加密后的密码,才能把数据保存到 myserver 的 result.txt 上 int res = client.save_data("acs_4851", "c4cd762a", a, b); // if (!res) puts("success"); // else puts("failed"); transport->close(); } catch (TException& tx) { cout << "ERROR: " << tx.what() << endl; } } void match() { while (users.size() > 1) { auto a = users[0], b = users[1]; users.erase(users.begin()); users.erase(users.begin()); save_result(a.id, b.id); } } void add(User user) { users.push_back(user); } void remove(User user) { for (uint32_t i = 0; i < users.size(); i ++ ) if (users[i].id == user.id) { users.erase(users.begin() + i); break; } } private: vector<User> users; }pool; class MatchHandler : virtual public MatchIf { public: MatchHandler() { // Your initialization goes here } int32_t add_user(const User& user, const std::string& info) { // Your implementation goes here printf("add_user\n"); unique_lock<mutex> lck(message_queue.m); message_queue.q.push({user, "add"}); // 唤醒 message_queue.cv.notify_all(); return 0; } int32_t remove_user(const User& user, const std::string& info) { // Your implementation goes here printf("remove_user\n"); unique_lock<mutex> lck(message_queue.m); message_queue.q.push({user, "remove"}); // 唤醒 message_queue.cv.notify_all(); return 0; } }; void consume_task() { while (true) { unique_lock<mutex> lck(message_queue.m); if (message_queue.q.empty()) { // 等待 message_queue.cv.wait(lck); } else { auto task = message_queue.q.front(); message_queue.q.pop(); // 解锁 lck.unlock(); // do task if (task.type == "add") pool.add(task.user); else if (task.type == "remove") pool.remove(task.user); pool.match(); } } } int main(int argc, char **argv) { int port = 9090; ::std::shared_ptr<MatchHandler> handler(new MatchHandler()); ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler)); ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port)); ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory); cout << "Start Match Server" << endl; // 开线程 thread matching_thread(consume_task); server.serve(); return 0; }
6、继续完善 main.cpp
匹配逻辑 Match Server 3.0,改为每 1s 钟匹配一次,发现两名玩家分值差小于等于 50 就匹配成功。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 // 玩家池 class Pool { public: ... void match() { while (users.size() > 1) { sort(users.begin(), users.end(), [](User& a, User& b){ return a.score < b.score; }); bool flag = true; for (uint32_t i = 1; i < users.size(); i ++ ) { auto a = users[i - 1], b = users[i]; if (b.score - a.score <= 50) { users.erase(users.begin() + i - 1, users.begin() + i + 1); save_result(a.id, b.id); flag = false; break; } } if (flag) break; } } ... private: vector<User> users; }pool;
完整代码
match_server 端
match_client 端
7、继续完善 main.cpp
匹配逻辑 Match Server 4.0,将原来的单线程匹配升级为多线程。每次调用 consume_task
匹配函数就新建一个线程。
原来使用的是 TSimpleServer
,简单版的 Server,现在改用 TThreadServer
(T - Thrift)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 int main(int argc, char **argv) { TThreadedServer server( std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()), std::make_shared<TServerSocket>(9090), //port std::make_shared<TBufferedTransportFactory>(), std::make_shared<TBinaryProtocolFactory>()); cout << "Start Match Server" << endl; // 开线程 thread matching_thread(consume_task); server.serve(); return 0; }
还需要添加一个工厂
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class MatchCloneFactory : virtual public MatchIfFactory { public: ~MatchCloneFactory() override = default; MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override { std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport); /*cout << "Incoming connection\n"; cout << "\tSocketInfo: " << sock->getSocketInfo() << "\n"; cout << "\tPeerHost: " << sock->getPeerHost() << "\n"; cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n"; cout << "\tPeerPort: " << sock->getPeerPort() << "\n";*/ return new MatchHandler; } void releaseHandler(MatchIf* handler) override { delete handler; } };
另外需要替换一些变量名,将 Calculator
替换成 Match
,完整代码
8、终极版 ,在第 6 步的基础上继续完善 main.cpp
匹配逻辑,原来的匹配条件是如果两个人的分支相差不超过 50 就匹配成功,但这种实现存在问题,比如当前匹配池中有两名玩家,一个 1000 分一个 500 分,在没有其他玩家参与的情况下,这两名玩家将永远不会匹配成功,这是不合理的。
所以我们可以添加等待时间机制,在等待一段时间后,还没能匹配成功的话,就将他两匹配到一起。
具体逻辑:
每个玩家添加绑定一个等待时间,每等一秒就将分值扩大 50
如果此时有两个玩家的分值都满足可以接收的差值,就匹配成功
main.cpp
匹配逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 bool check_match(uint32_t i, uint32_t j) { auto a = users[i], b = users[j]; int dt = abs(a.score - b.score); int a_max_diff = wt[i] * 50; // 每等待 1s 扩大 50 分差 int b_max_diff = wt[i] * 50; return dt <= a_max_diff && dt <= b_max_diff; } void match() { for (uint32_t i = 0; i < wt.size(); i ++ ) wt[i] ++ ; // 表示等待秒数 + 1 while (users.size() > 1) { bool flag = true; for (uint32_t i = 0; i < users.size(); i ++ ) { for (uint32_t j = i + 1; j < users.size(); j ++ ) { if (check_match(i, j)) { auto a = users[i], b = users[j]; users.erase(users.begin() + j); // 将两名玩家删除,先删后面的,再删前面的,防止下标变化 users.erase(users.begin() + i); wt.erase(wt.begin() + j); // 将两名玩家从等待数组中删除 wt.erase(wt.begin() + i); save_result(a.id, b.id); flag = false; break; } } if (!flag) break; } if (flag) break; } }
再修改 add_user()
和 remove_user()
方法,添加等待数组的添加和删除。
xujiaojiao
生活明朗,万物可爱
此文章版权归xujiaojiao所有,如有转载,请注明来自原作者