如何用 Thrift 写一个服务

版权声明:本文所有内容都是原创,如有转载请注明出处,谢谢。

什么是Thrift

Thrift是Facebook开源的软件,据我的理解,Facebook, Google, Amazon,LinkedIn那些家伙总是觉得世界上的轮子都不好用,只有他们的轮子才是最好的。所以他们经常创造一些轮子出来。不过真的承认,他们的轮子真的比其他的好用呢。Thrift就是一个这么好用的轮子。那么,Thrift有什么好处呢?

  • 利用Thrift的多语言特性,可以很容易的定义日志格式。这样的日志格式可以在C++语言生成,落地到HDFS,然后再用java语言去访问,简直太方便了。
  • 利用Thrift的服务功能,可以很容易的定义好RPC的接口。然后用C++或者Java实现这个服务,并在服务器上跑起来。其他人不管是用PHP,C++,Java,都可以直接调用这个RPC服务。

目前,Thrift支持的语言包括C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, OCaml and Delphi和其他语言。

Thrift语法结构

Thrift是有自己的语言结构的,按照他的语法结构定义好文件。然后他就能生成相应的源代码。基本类型: bool, byte, i16, i32, i64, double, string,基本上常见的数据类型都有。唯一不支持的数据类型就是float。 所以如果定义浮点类型,则只能定义double类型。在RPC服务中,因为不同机器的关系,浮点类型也不常用。一般情况下,也都是将浮点类型转成整型,保留小数点后N位。

  • 容器: list, set, map。
  • 结构:struct,结构的定义类似于C语言。
  • 服务:service, 类似于接口,定义好客户端可以RPC函数。注意Thrift的函数名称是不可以overload的。 也就是说每个函数的名称都必须是唯一的。这个也不是一个什么大的问题。
  • 其他一些功能: typedef, namespace, const, include其他thrift文件,注释等等

实验

在这个实验中,我们首先定义几个简单的结构体。然后实现服务器端和client端的通信。结构体的定义如下,主要是定义了Tweet这个结构体。 并且定义了Twitter这个服务,他有一个postTweet的方法。用户可以向服务器不断地发送请求,服务器可以把请求记录到日志或者写入数据库。我这里就直接打印出来就好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace cpp com.nncao.tweet
namespace java com.nncao.tweet
enum TweetType {
  TWEET,
  RETWEET = 2,
  DM,
  REPLY
}
struct Location {
1: required double latitude;
2: required double longitude;
}
struct Tweet {
1: required i32 userId;
2: required string userName;
3: required string text;
4: optional Location loc;
5: optional TweetType type = TweetType.TWEET;
6: optional string language="english";
}
service Twitter {
bool postTweet( 1:Tweet tweet );
}

利用这个文件,我们就可以用thrift编译出来C++和Java文件。利用编译出来的文件,我们需要完成服务器端和客户端的实现。这里是实现类,定义了服务端如何处理每个请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.nncao.tweet;
/** A class to handle tweets from clients
 * @author Michael Cao
 * @version 2014-06-19
 */
public class TweetImp implements Twitter.Iface {
  @Override
  public boolean postTweet( Tweet tweet ) {
      //you can write this tweet to db or what
      System.out.print( tweet.getUserId() + "\t" );
      System.out.print( tweet.getUserName() + "\t" );
      System.out.print( tweet.getText() + "\n");
      return true;
  }
}

利用上面的实现类,然后再把服务器定义好。服务器端主要需要定义服务器的类型,传输协议的类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.nncao.tweet;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
/** A class to starts the server and provides service
 * @author Michael Cao
 * @version 2014-06-19
 */
public class TweetServer{
  public static void main( String [] args ) {
        try {
            TServerSocket serverSocket = new TServerSocket( 32586 );
            TProcessor processor = new Twitter.Processor( new TweetImp());
            TThreadPoolServer.Args threadArgs = new TThreadPoolServer.Args(serverSocket);
            TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
            threadArgs.processor(processor);
            threadArgs.protocolFactory(factory);
            threadArgs.maxWorkerThreads( 10 );
            TServer server = new TThreadPoolServer( threadArgs );
            System.out.println( "Server is serving ... ");
            server.serve();
        } catch (TTransportException e ) {
            System.out.println("error: " + e);
            System.exit(-1);
        }
  }
}

这样,服务器端就可以跑起来了。下面是客户端,客户端每次会发10个请求到服务器。如果需要可以判断每个请求是否成功,但在这里,并没有判断。也可以自己定义一些需要的返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.nncao.tweet;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
/** A client to post tweet to server
 * @author Cao Nannan
 * @version 2014-06-21
 */
public class TweetClient {
    public static void main( String [] args ) {
        try {
            TTransport transport = new TSocket( "localhost", 32586 );
            transport.open();
           for( int i = 0; i < 10; i++ ) {
               TProtocol protocol = new TBinaryProtocol( transport );
               Twitter.Client client = new Twitter.Client(protocol);
               Tweet tweet = new Tweet(i, "nncao", "this is tweet " + i);
               client.postTweet(tweet);
           }
            transport.close();
        } catch ( TTransportException e ) {
            System.out.println( "error: " + e);
            System.exit( -1 );
        } catch ( TException e ) {
            System.out.println( "error: " + e);
            System.exit( -1 );
        }
    }
}

这样,整个服务就完成了。