Chalk是一个Java开发的XMPP客户端通讯库,可以用于开发Java桌面和Android的XMPP客户端。Chalk基于插件体系设计,这使得它易于使用及易于扩展。
Chalk是一个纯粹的Java库,在使用它时,我们需要将它配置为工程的依赖库。以Maven工程为例,我们需要做以下的配置。
在pom.xml中添加FirstLineCode的Maven仓库。
<repository> <id>com.thefirstlineofcode.release</id> <name>TheFirstLineOfCode Repository - Releases</name> <url>http://repo.thefirstlineofcode.com/content/repositories/releases</url> </repository> Chalk库被设计成插件架构,以保证良好的扩展性,不同的功能被封装在不同的插件依赖库里,开发者可以根据自己需要,选择要使用的依赖。
我们最少需要配置以下的基础依赖。
<dependency> <groupId>com.thefirstlineofcode.chalk</groupId> <artifactId>com.thefirstlineofcode.chalk</artifactId> <version>0.2.0-RELEASE</version> </dependency> 可以根据需要添加可选的插件依赖,例如添加在线注册(In Band Registration)插件依赖。
<dependency> <groupId>com.thefirstlineofcode.chalk.xeps</groupId> <artifactId>com.thefirstlineofcode.chalk.xeps.ibr</artifactId> <version>0.2.0-RELEASE</version> </dependency> 下表列出了Chalk提供了的常用插件。
| 插件名 | groupId | artifactId | 插件描述 | 备注 |
|---|---|---|---|---|
| IM | com.thefirstlineofcode.chalk | com.thefirstlineofcode.chalk | 实现RFC3921(Instant Messaging and Presence) | IM插件内置在基础依赖包中 |
| Ping | com.thefirstlineofcode.chalk.xeps | com.thefirstlineofcode.chalk.xeps.ping | 实现XEP-0199(XMPP Ping) | |
| IBR | com.thefirstlineofcode.chalk.xeps | com.thefirstlineofcode.chalk.xeps.ibr | 实现XEP-0077(In-Band Registration) | |
| MUC | com.thefirstlineofcode.chalk.xeps | com.thefirstlineofcode.chalk.xeps.muc | 实现XEP-0045(Multi-User Chat) | |
| LEP IM | com.thefirstlineofcode.chalk.leps | com.thefirstlineofcode.chalk.leps.im | 实现LEP-0011(Traceable Message) | 非标准协议,解决XMPP IM的一些缺陷,如:双向订阅、可信赖消息;消息状态跟踪等 |
使用Chalk库最重要的接口就是IChatClient,IChatClient提供了两个核心的功能:
- 建立客户端到服务器端的连接(Stream)。
- 创建插件的API,应用开发者通过插件API来使用库提供的各种功能。
根据XMPP协议,在客户端-服务器之间,我们需要:(1)先建立一个信息通道(Stream) 。(2)在建立好的Stream上,交换信息(Stanza)。
使用以下的代码,可以建立Stream。
StreamConfig config = new StreamConfig("im.thefirstlineofcode.com", 5222); config.setResource("my_android_mobile"); IChatClient chatClient = new StandardChatClient(config); try { chatClient.connect("my_user_name", "my_password"); } catch (ConnectionException e) { throw new RuntimeException("can't connect to host", e); } catch (AuthFailureException e) { throw new RuntimeException("auth failed", e); } XMPP规范定义了Stream建立的几个协商阶段:
- Initial Stream
- TLS
- SASL
- Resource Binding
- Session Establishment
如果需要监控Stream建立的细节,可以使用INegotiationListener。
chatClient.addNegotiationListener(new INegotiationListsener() { public void before(IStreamNegotiant source) { if (source instanceof TlsNegotiation) { System.out.println("Ready to negotiate TLS"); } } public void after(IStreamNegotiant source) { if (source instanceof TlsNegotiation) { System.out.println("TLS negotiation has done"); } } public void occurred(NegotiationException exception) { IStreamNegotiant source = exception.getSource(); if (source instnaceof SaslNegotiant) { SaslError saslError = (SaslError)exception.getAdditionalErrorInfo(); System.out.println("Error occured in SASL negotiation. Additional error info: " + saslError); } } public void done(IStream stream) { System.out.println("Stream has created"); } }); chatClient.connect("my_user_name", "my_password"); 在TLS协商过程中,如果客户端需要检查服务器证书有效性,可以使用IPeerCertificateTruster。
((StandardChatClient)chatClient).setPeerCertificateTruster(new IPeerCertificateTruster() { public boolean accept(X509Certificate[] certificates) { // check certificates } } ); XMPP是一组协议族,包括RFC协议(例如:RFC3920、RFC3921)和XEPs(例如:XEP-0045 Multi-User Chat、XEP-0077 In-Band Registration)协议。在很多情况下,用户甚至可能会基于XMPP协议标准,定义自己的私有协议。
Chalk基于插件架构设计,以对应XMPP的扩展性和灵活性。除了少数基础功能(例如:建立Stream)之外,Chalk的所有功能都由插件来提供。
Chalk基于一个简单、灵活的插件框架,使用插件功能,一般使用插件需要以下的步骤:
- 注册插件
- 获取插件API
- 使用插件功能
我们用一个简单的插件PingPlugin(实现XEP-0199 XMPP Ping)来演示插件的使用方法。
使用插件前,需要先注册插件,IChatClient提供了注册插件的接口。
注册PingPlugin的代码如下。
chatClient.register(PingPlugin.class); 注册插件后,就可以获取插件的API。每个插件根据自己协议的细节,设计并提供API,应用开发者需要通过对应的文档了解相关细节。
Ping插件提供了一个名为IPing的接口,定义如下。
public interface IPing { public enum Result { PONG, SERVICE_UNAVAILABLE, TIME_OUT } Result ping(); void setTimeout(int timeout); int getTimeout(); } 获取IPing接口的代码如下。
IPing ping = chatClient.createApi(IPing.class); ping.setTimeout(4 * 1000); // Set timeout to the ping operation. Default is 2 * 1000ms. Result result = ping.ping(); // Ping the server and waiting for result. if (result == Ping.Result.PONG) { System.out.println("Pong."); } else if (result == SERVICE_UNAVAILABLE) { System.out.println("Server doesn't support the protocol."); } else { System.out.println("Ping timed out."); } Chalk的一个重要设计目的,是为了充分体现XMPP协议的灵活性及扩展性,以便于我们可以根据自己需要,灵活的定义和开发IM通讯协议。
XMPP协议为一组松散的协议族,除了RFC3920 XMPP Core,RFC3921 XMPP IM之外,其它的协议(主要是XEPs)被视为可选的协议。不同的IM产品会选择实现其中一些协议, Chalk也实现了部分XMPP标准协议,并提供一些改善标准XMPP的非标准协议扩展。
以下为Chalk已经实现的插件(协议)。
IbrPlugin实现了XEP-0077(In-Band Registration),为客户端提供在线注册功能。
chaClient.register(IbrPlugin.class); try { IRegistration registration = chatClient.createApi(IRegistration.class); registration.register(new IRegistrationCallback() { public Object fillOut(IqRegister iqRegister) { if (iqRegister.getRegister() instanceof RegistrationForm) { RegistrationForm form = new RegistrationForm(); form.getFields().add(new RegistrationField("username", "my_user_name")); form.getFields().add(new RegistrationField("password", "my_password")); return form; } else { throw new RuntimeException("Can't get registration form"); } } }); } catch(RegistrationException e) { IbrError error = e.getError(); if (error instanceof IbrError.Conflict) { System.out.println("User has existed. Please change your name."); } else if (error instanceof IbrError.NOT_ACCEPTABLE) { System.out.println("Illegal user name. Please change your name."); } else { System.out.println("Registration failed."); } } InstantingMessengerPlugin实现了RFC3921(Instant Messaging and Presence),提供Roster管理、Subscription管理,及发送接收Presence和Message的功能。
为简化文档,我们在后续说明中,可能会使用IM插件作为InstantingMessengerPlugin的同义词,当提及IM插件时,意味着是InstantingMessengerPlugin。
chaClient.register(InstantingMessengerPlugin.class); 在成功建立Stream后,根据RFC3921要求,客户端应该要立即发送一个Initial Presence,服务器端在收到Initial Presence后,才会将客户端的状态置为Available。
IInstantingMessenger im = chatClient.createApi(IInstantingMessenger.class); im.send(new Presence()); // Send initial presence 在此后任何时刻,都可以通过发送Presence,更改自己的当前状态。
Presence presence = new Presence(Show.DND); presence.getStatuses().add(new LangText("I'm being in a meeting.")); im.send(presence); 可以通过IPresenceListener监听联系人的Presence变化。
im.addPresenceListener(new IPresenceListener() { public void received(Presence presence) { System.out.println("Contact " + presence.getFrom() + " changed it's presence to " + presence.toString()); } }); XMPP IM协议中,使用Roster来管理联系人列表。IM插件提供IRosterService和IRosterListener来管理Roster。
使用IRosterService的retrieve()方法来从服务器端获取Roster列表。
IRosterService rosterService = im.getRosterService(); rosterService.addRosterListener(new IRosterListener() { void retrieved(Roster roster) { // Process the roster that is retrieved from server. } void occurred(RosterError error) { // An error occurred } ... }); rosterService.retrieve(); 注意:retrieve()是一个异步方法,并不会直接返回结果,所以我们需要注册一个IRosterListener来监听获取的结果。
IRosterListener还提供了updated()和deleted()方法,可以用于监听Roster的变更。
rosterService.addRosterListener(new IRosterListener() { ... public void updated(Roster roster) { // Process the updated roster } public void deleted(Roster roster) { // process the deleted roster } ... }); 大部分时候,我们并不需要直接变更Roster,Roster管理往往和Subscription管理相关,当Subscription状态变更时,会自动导致Roster变更。
在某些情况下,我们需要直接变更Roster,例如修改用户的分组,IRosterService提供了以下的方法。
public interface IRosterService { ... void add(Roster roster); void update(Roster roster); void delete(Roster roster); ... } XMPP IM协议使用Subscription来管理联系人之间的关联关系。IM插件提供了ISubscriptionService和ISubscriptionListener来管理Subscription。
JabberId contact = JabberId.parse("smartsheep@im.thefirstlineofcode.com"); ISubscriptionService subscriptionService = im.getSubscriptionService(); subscriptionService.subscribe(contact); 注册ISubscriptionListener,可以接收订阅消息。
subscriptionService.addSubscriptionListener(new ISubscriptionListener() { ... public void asked(JabberId user) { System.out.println("User " + user + " wants to subscribe you."); } ... }); 如果用户决定通过对方的订阅,可以使用以下代码。
subscriptionService.approve(contact); 如果拒绝对方订阅,可以使用以下代码。
subscriptionService.refuse(contact); 使用ISubscriptionListener监听订阅反馈信息。
subscriptionService.addSubscriptionListener(new ISubscriptionListener() { ... public void approved(JabberId contact) { System.out.println("User " + contact + " approved your subscription."); } public void refused(JabberId contact) { System.out.println("User " + user + " refused your subscription."); } ... }); JabberId contact = JabberId.parse("agilest@im.thefirstlineofcode.com"); Message message = new Message("Hello, Agilest!"); message.setTo(contact); im.send(message); 或者,采用更简洁的方式。
JabberId contact = JabberId.parse("agilest@im.thefirstlineofcode.com"); im.send(contact, new Message("Hello, Agilest!")) im.addMessageListener(new IMessageListener() { public void received(Message message) { System.out.println("Received a message from user " + message.getFrom()); } }); InstantingMessengerPlugin2实现了LEP-0011(Traceable Message)。主要是提供可靠消息服务,以及可以跟踪消息状态。
LEP-0011是非标准协议,需要支持LEP协议的服务器配合,例如:Granite XMPP Server。
chaClient.register(InstantingMessengerPlugin2.class); InstantingMessengerPlugin2插件,使用IMessageListener2的接口来替代标准的IMessageListener。IMessageListeners接口提供了一个traced方法来跟踪消息状态。
IInstantingMessenger2 im2 = chatClient.createApi(IInstantingMessenger2.class); im2.addMessageListener(new IMessageListener2() { public void received(Message message) { System.out.println("Received a message from user " + message.getFrom()); } public void traced(Trace trace) { for (MsgStatus status : trace.getMsgStatuses()) { if (status.getStatus() == MsgStatus.Status.SERVER_REACHED) { System.out.println("Message which id is " + status.getId() + " has reached server at time " + status.getStamp()); } else if (status.getStatus() == MsgStatus.Status.PEER_REACHED) { System.out.println("Message which id is " + status.getId() + " has reached peer at time " + status.getStamp()); } else { // status.getStatus() == MsgStatus.Status.MESSAGE_READ System.out.println("Message which id is " + status.getId() + " has read by contact at time " + status.getStamp()); } } } }); MucPlugin实现了XEP-0045(Multi-User Chat),提供聊天室多人聊天功能。
chaClient.register(MucPlugin.class); IMucService muc = chatClient.createApi(IMucService.class); JabberId[] hosts = muc.getMucHosts(); JabberId[] rooms = muc.getMucRooms(); IRoom room = muc.getRoom(roomJid); RoomInfo roomInfo = room.getRoomInfo(); room.enter("my_nick_name"); room.exit(); Occupant[] occupants = room.getOccupants(); 可以给聊天室发送消息,聊天室里的所有用户都能收到该消息。
room.send(new Message("Hello, everyone!")); 可以发送私聊消息给聊天室中某个用户。
room.send("user_nick_name", new Message("Hello, everyone!")); JabberId roomJid = JabberId.parse("my_chat_room_name@im.thefirstlineofcode.com"); muc.createInstantRoom(roomJid, "my_nick_name"); JabberId roomJid = JabberId.parse("my_chat_room_name@im.thefirstlineofcode.com"); muc.createReservedRoom(roomJid, "my_nick_name", new StandardRoomConfigurator() { protected RoomConfig configure(RoomConfig roomConfig) { roomConfig.setRoomName("my first room"); roomConfig.setRoomDesc("Hope you have happy hours here!"); roomConfig.setMembersOnly(true); roomConfig.setAllowInvites(true); roomConfig.setPasswordProtectedRoom(true); roomConfig.setRoomSecret("simple_password"); roomConfig.getGetMemberList().setParticipant(false); roomConfig.getGetMemberList().setVisitor(false); roomConfig.setWhoIs(WhoIs.MODERATORS); roomConfig.setModeratedRoom(true); return roomConfig; } } ); JabberId myColleague = JabberId.parse("my_colleague_name@im.thefirstlineofcode.com"); room.invite(myColleague, "Let's discuss our plan") 通过IRoomListener可以监听多种Room相关的事件。当Room产生事件时,IRoomListener接收到RoomEvent类型的对象。
RoomEvent对象有两个关键属性,roomJid和eventObject,roomJid表示Event来自哪个Rooom,而eventObject则根据RoomEvent类型的不同,可以是不同类型的对象,表示Event的具体细节。
处理RoomEvent的代码大概如下。
muc.addRoomListener(new IRoomListener() { public void received(RoomEvent<?> event) { if (event instanceof InvitationEvent) { InvitationEvent invitationEvent = (InvitationEvent)event; JabberId roomJid = invitationEvent.getRoomJid(); Invitation invitation = invitationEvent.getEventObject(); System.out.println(String.format("'%s' invites you to join room '%s'", invitation.getInvitor(), roomJid)); } else if (event instanceof EnterEvent) { Enter enter = ((EnterEvent)event).getEventObject(); Occupant occupant = muc.getRoom(event.getRoomJid()).getOccupant(enter.getNick()); int sessions = occupant == null ? 0 : occupant.getSessions(); System.out.println(String.format("'%s'[sessions:%d] has joined room '%s'", enter.getNick(), sessions, event.getRoomJid())); } else if (event instanceof ExitEvent) { Exit exit = ((ExitEvent)event).getEventObject(); Occupant occupant = muc.getRoom(event.getRoomJid()).getOccupant(exit.getNick()); int sessions = occupant == null ? 0 : occupant.getSessions(); System.out.println(String.format("'%s'[sessions:%d] has exited room '%s'", exit.getNick(), sessions, event.getRoomJid())); } else if (event instanceof ChangeAvailabilityStatusEvent) { ChangeAvailabilityStatus changeAvailabilityStatus = ((ChangeAvailabilityStatusEvent)event).getEventObject(); System.out.println(String.format("'%s' has changed it's availability status to: %s", changeAvailabilityStatus.getNick(), getAvailabilityStatus(changeAvailabilityStatus))); } else if (event instanceof RoomMessageEvent) { RoomMessageEvent messageEvent = (RoomMessageEvent)event; System.out.println(String.format("groupchat message received[from '%s' at room '%s']: %s", messageEvent.getEventObject().getNick(), messageEvent.getRoomJid(), messageEvent.getEventObject().getMessage())); } else if (event instanceof PrivateMessageEvent) { PrivateMessageEvent privateMessageEvent = (PrivateMessageEvent)event; System.out.println(String.format("groupchat private message received[from '%s' at room '%s']: %s", privateMessageEvent.getEventObject().getNick(), privateMessageEvent.getRoomJid(), privateMessageEvent.getEventObject().getMessage())); } else if (event instanceof DiscussionHistoryEvent) { DiscussionHistoryEvent discussionHistoryEvent = (DiscussionHistoryEvent)event; System.out.println(String.format("groupchat discussion history message received[from '%s' at room '%s']: %s", discussionHistoryEvent.getEventObject().getNick(), discussionHistoryEvent.getRoomJid(), discussionHistoryEvent.getEventObject().getMessage())); } else if (event instanceof ChangeNickEvent) { ChangeNickEvent changeNickEvent = (ChangeNickEvent)event; System.out.println(String.format("user '%s'[sessions: %d] changed his nick[at room '%s']: %s", changeNickEvent.getEventObject().getOldNick(), changeNickEvent.getEventObject().getOldNickSessions(), changeNickEvent.getRoomJid(), changeNickEvent.getEventObject().getNewNick())); } else if (event instanceof RoomSubjectEvent) { RoomSubjectEvent roomSubjectEvent = (RoomSubjectEvent)event; if ("".equals(roomSubjectEvent.getEventObject().getSubject())) { System.out.println(String.format("there are no room subject in room '%s'", roomSubjectEvent.getRoomJid())); } else { System.out.println(String.format("room subject received[from '%s' in room '%s']: %s", roomSubjectEvent.getEventObject().getNick(), roomSubjectEvent.getRoomJid(), roomSubjectEvent.getEventObject().getSubject())); } } else if (event instanceof KickedEvent) { KickedEvent kickedEvent = (KickedEvent)event; System.out.println(String.format("you are kicked by '%s' from room '%s'. reason is '%s'", kickedEvent.getEventObject().getNick(), kickedEvent.getEventObject().getActor().getNick(), kickedEvent.getRoomJid(), kickedEvent.getEventObject().getReason())); } else if (event instanceof KickEvent) { KickEvent kickEvent = (KickEvent)event; System.out.println(String.format("'%s' is kicked from room '%s'", kickEvent.getEventObject().getNick(), kickEvent.getRoomJid())); } } }); 以下是RoomEvent对象列表。
| 对象类型 | eventObject类型 | 描述 |
|---|---|---|
| ChangeAvailabilityStatusEvent | ChangeAvailabilityStatus | 有聊天室用户修改了他的Presence状态 |
| ChangeNickEvent | ChangeNick | 有聊天室用户修改了他的昵称 |
| DiscussionHistoryEvent | RoomMessage | 进入聊天室时,会收到聊天室最近的聊天历史消息 |
| EnterEvent | Enter | 有新用户进入聊天室 |
| ExitEvent | Exit | 有用户退出了聊天室 |
| InvitationEvent | Invitation | 加入聊天室的邀请 |
| KickedEvent | Kicked | 用户自己被踢出了聊天室 |
| KickEvent | Kick | 有用户被踢出了聊天室 |
| PrivateMessageEvent | RoomMessage | 私聊消息 |
| RoomMessageEvent | RoomMessage | 有用户在聊天室发了消息 |
| RoomSubjectEvent | RoomSubject | 聊天室主题变更 |
Chalk的架构设计将系统分为两部分。
- 主程序框架
- 插件
在主程序框架中,灰色的框是系统的骨架,是系统中比较稳定的部分,虚线框的部分是系统中灵活及充满变化的地方。
这种灵活性和变化从何而来?因为XMPP被设计成一个内核稳定,但高度可扩展的协议,通过XML的namespace语义,我们可以在stanza(iq, message, presence)中添加任意的新协议元素,从而扩展XMPP协议。通过这样的方式,XMPP被扩展成一个庞大的协议族,仅公开的XEPs(XMPP Extension Protocols)就有近200个。
对应XMPP协议的设计原则,Chalk也将系统设计成稳定的框架+可扩展的插件子系统。框架部分封装了通讯细节和XMPP基础概念,插件子系统允许通过插件任意扩展系统的能力。
值得注意的是,系统的扩展和变化主要出现这几个地方。
- 协议定义
- 协议-协议对象的转化
- 协议逻辑处理
大部分情况下,我们希望插件和协议有清楚的映射关系。这意味着,我们希望尽量能够在一个插件中,封装一个或一组相关的XMPP协议。应该尽量避免将一个独立协议的逻辑,拆分在多个插件中。
为简化Plugin的开发,Chalk提供了IChatSystem和IChatServices,希望能够把Plugin和系统之间的联系,简化限制在这两个接口内。
IChatSystem允许Plugin将特定的扩展注册到系统当中,主要包括:
- 协议对象(Protocol Object)
- 协议-协议对象转换器(Parser & Translator)
- 插件的Api(Api)
- 插件Api的实现(Api Impl)
IChatServices封装了下层通讯细节及XMPP基础概念,插件的Api实现可以调用IChatServices提供的服务,处理协议的细节逻辑。
我们通过一个简单的案例,说明如何开发一个Chalk插件。
我们选择实现一个简单的协议XEP-0199(XMPP Ping)。
XMPP Ping是一个基于iq的协议,简单来说,我们需要处理以下的逻辑。
客户端向服务器端发送一个ping请求。
<iq from='juliet@capulet.lit/balcony' to='capulet.lit' id='c2s1' type='get'> <ping xmlns='urn:xmpp:ping'/> </iq> 如果服务器支持XMPP Ping协议,服务器返回pong响应。
<iq from='capulet.lit' to='juliet@capulet.lit/balcony' id='c2s1' type='result'/> 如果服务器不支持XMPP Ping协议,则返回错误。
<iq from='capulet.lit' to='juliet@capulet.lit/balcony' id='c2s1' type='error'> <ping xmlns='urn:xmpp:ping'/> <error type='cancel'> <service-unavailable xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/> </error> </iq> 为了便于逻辑处理,我们一般会对应网络上传输协议XML文档,设计一个Java业务对象,这样便于用业务类来进行处理。这个Java业务对象,我们称之为协议对象(Protocol Object)。
在本案例中,协议对象结构非常简单,类定义如下。
public class Ping { public static final Protocol PROTOCOL = new Protocol("urn:xmpp:ping", "ping"); } Ping类不包含任何信息,只是用类型来表示ping协议。
有了Protocol Object,引发了另外的问题,就是如何处理XMPP协议文档-Protocol Object之间的转换。
当我们收到网络上的XML协议文档,需要将其内容转换成一个Protocol Object实例。对应的,当我们发送一个Protocol Object时,需要将Protocol Object包含的协议信息,转换成对应的XMPP协议文档,再发送到网络上去。
Chalk用IParser和ITranslator来处理Protocol Object-XMPP协议文档之间的转换。IParser负责将一个XMPP协议文档转换成Protocol Object实例。ITranslator负责将一个Protocol Object实例翻译成对应的XMPP协议文档。
为了简化这些转换逻辑,Chalk使用Basalt项目提供的一个OXM(Protocol Object-XMPP Document Mapping)框架。在大部分情况下,我们并不需要为对象和XMPP协议文档之间的转换,编写逻辑代码。我们只需要选择系统内置的IParser和ITranslator实现。
因为Ping对象结构非常简单,我们可以选择使用SimpleObjectParser和SimpleObjectTranslator来处理对象和XML文档之间的转换。
Basalt是一个XMPP协议库,定义了基础的XMPP协议对象,并提供一个简单易用的OXM(Protocol Object-XMPP Document Mapping)框架。
最常用的IParser和ITranslator,是NamingConventionParser和NamingConventionTranslator,它们采用命名约定的方法,将Protocol Object的Field和XMPP协议文档中的Element做对应的拷贝。
关于Basalt OXM(Protocol Object-XMPP Document Mapping)框架的更多信息,请参考Basalt项目相关文档。
在理想的情况下,我们将Chalk的开发者分为两类:
- 插件开发者
- 应用开发者
插件开发者理解XMPP协议细节,以及Chalk的插件架构体系。他们将XEPs或者自定义的非标准协议,开发成对应的Plugin。
应用开发者不需要理解XMPP协议的细节,他们只是使用Chalk基础框架和选用插件,在此之上开发具体XMPP应用,例如IM,物联网应用等。
在现实中,开发者可能需要兼插件开发者和应用开发者。即使是这样的情况,在设计插件时,也应该遵循一些原则:
- 提供易于使用的Api,使得应用开发更简单、直观。
- 屏蔽XMPP协议的底层细节。
- 避免绑定具体的应用逻辑,提升插件的复用性。
插件开发者一个重要的任务,是设计良好的Api,给应用开发者调用。
在本案例中,PingPlugin提供以下的Api给应用开发者使用。
public interface IPing { public enum Result { PONG, // Server returned a pong. SERVICE_UNAVAILABLE, // Server doesn't support the protocol. TIME_OUT // Ping timed out. } /** * Send a ping request to server and waiting for result. */ Result ping(); /** * Set timeout for ping operation. */ void setTimeout(int timeout); /** * Get the ping timeout. */ int getTimeout(); } 基于IChatServices接口,有多种办法可以实现协议逻辑。
只要正确的注册Api和Api Impl,框架会在在Api Impl里自动注入IChatServices对象,所以我们总是可以获取到IChatServices来使用。
最常规的想法,我们向服务器端发送一个带id的ping消息,然后监控所有收到的信息,如果有一条相同id的消息返回,我们检查是pong还是server-unavailable,如果超过一定的时间还没收到响应消息,我们就返回超时。
如果是这样的思路,我们可以:
- 使用IIqService发送一个ping消息到服务器。
IIqService iqService = chatServices.getIqService(); Iq ping = new Iq(Iq.Type.GET); ping.setObject(new Ping()); String pingId = ping.getId(); iqService.send(ping); 注意,我们需要记录下发送的iq的id,便于后面找到服务器端对应的响应消息。
- 我们当然需要增加一个IIqListener,监听从服务器端来的消息,检查是否pong的响应。
iqService.addListener(new IIqListener() { public void received(Iq iq) { if (pingId.equals(iq.getId()) { // Received response from server } } }); - 我们还需要监听错误消息,检查服务器可能返回service-unavailable错误。
IErrorService errorService = chatServices.getErrorService(); errorService.addListener(new IErrorListener() { public void occurred(IError error) { if (pingId.equals(error.getId()) { if (error instanceof ServerUnavailable) { // Received server-unavailable error from server } } } }); - 我们当然还需要一个Timer定时器,来处理超时的情况。
Timer timer = new Timer(); timer.schedule(pingTimeoutTask, timeout); 这些处理看上去非常繁琐,Chalk提供了稍微简便一些的方法,我们可以使用SyncOperationTemplate类来简化一些代码逻辑。
以下是使用SyncOperationTemplate大概的代码逻辑。
public class PingImpl implements IPing { private IChatServices chatServices; private String id; private int timeout; public Result ping() { SyncOperationTemplate<Iq, IPing.Result> template = new SyncOperationTemplate<Iq, IPing.Result>(chatServices); try { return template.execute(new ISyncIqOperation<IPing.Result>() { public void trigger(IUnidirectionalStream<Iq> stream) { Iq iq = new Iq(Iq.Type.SET); iq.setObject(new Ping()); id = iq.getId(); stream.send(iq, timeout); } public boolean isErrorOccurred(StanzaError error) { if (id.equals(error.getId())) return true; return false; } public boolean isResultReceived(Iq iq) { if (id.equals(iq.getId())) return true; return false; } public Result processResult(Iq iq) { return IPing.Result.PONG; } }); } catch (ErrorException e) { if (e.getError().getDefinedCondition().equals(RemoteServerTimeout.DEFINED_CONDITION)) { return IPing.Result.TIME_OUT; } else { return IPing.Result.SERVICE_UNAVAILABLE; } } } } ... } 这里简化之处在于,我们可以在一个ISyncIqOperation内部类中处理所有逻辑,而不需要去访问IIqService,IIqListener,IErrorService,IErrorListener及Timer等诸多细节。
Legacy模式还是比较复杂,特别我们会注意到一个问题,我们总是需要在代码中跟踪相同id的消息,这似乎意味着跟踪id的处理,应该移交给框架去进行处理。
Chalk提供了Task模式的处理框架,可以避免我们琐碎的去跟踪相同id的消息,更加简化协议逻辑的处理。
注意,我们在IPing接口里,采用同步阻塞等待结果的方法,调用ping()方法后,程序会阻塞直到获得pong响应,或者接收到server-unavailable错误,或者等待超时返回。
在同步的情况下,最方便是使用ITaskService和ISyncTask接口。
public class PingImpl implements IPing { private IChatServices chatServices; private int timeout; ... public Result ping() { ITaskService taskService = chatServices.getTaskService(); try { return taskService.execute(new ISyncTask<Iq, IPing.Result>() { public void trigger(IUnidirectionalStream<Iq> stream) { Iq iq = new Iq(Iq.Type.SET); iq.setObject(new Ping()); stream.send(iq, timeout); } public Result processResult(Iq iq) { return IPing.Result.PONG; } }); } catch (ErrorException e) { if (e.getError().getDefinedCondition().equals(RemoteServerTimeout.DEFINED_CONDITION)) { return IPing.Result.TIME_OUT; } else { return IPing.Result.SERVICE_UNAVAILABLE; } } } ... } 可以看到,在Task模式下,框架默认监控相同id的消息,如果是接收到相同id的iq result,则回调processResult()方法进行处理。
如果服务器端返回相同id的错误,以及超时错误,都会统一封装成ErrorException,可以catch例外根据具体情况进行处理。
在实时消息系统中,Sync的场景会比较少,大多数场景下,等待来自服务器或联系人的消息,但是消息何时会来到,我们并不能预期。
在大多数情况下,我们应该使用Async Task而不是Sync Task,等消息到达的时候,触发回调方法。
ITaskService提供了执行Async Task的方法:
public interface ITaskService { ... void execute(ITask<?> task); ... } 关于Sync Task和Async Task的区别,还有一个值得注意的地方。所有的Sync Task的回调处理,都是在主消息接收线程里执行的,这意味着,如果有一个回调方法执行时,占用太多时间,会导致其它的Task被阻塞,有可能导致的一个结果是,应用程序一些业务被阻塞变慢。
当然我们可以在一些耗时的Sync Task回调方法里,启动新的线程,避免阻塞主消息接收线程。这是一个解决办法,但是我们有时候可能会容易忘记需要启动新线程。
Async Task采用了不同的处理方法,框架提供了一个线程池,每当接收到一个需要处理的Async Task回调时,系统会从线程池中启动一个线程,将回调逻辑放在新线程中去处理。这样,Async Task可以更好的避免系统阻塞变慢问题。
如果可能,应该尽可能的使用Async Task模式来处理协议的逻辑。
现在,我们已经处理好了所有的协议细节和业务逻辑,需要将所有的代码和逻辑,通过插件注册到系统中去。我们在上面已经提到过了,我们使用IChatSystem来帮助完成这项工作。
我们需要编写一个Plugin类,在本案例中,这个类是PingPlugin。
在本案例中PingPlugin中,我们需要:
- 注册协议的协议对象Ping,以及对应的Parser和Translator。
- 注册提供的Api接口IPing,以及IPing的具体实现PingImpl。
我们在Plugin类的init()方法里,注册插件给系统提供的扩展。在destroy()方法里,我们移除这些扩展。
PingPlugin的代码如下。
public class PingPlugin implements IPlugin { public void init(IChatSystem chatSystem, Properties properties) { chatSystem.registerParser( ProtocolChain.first(Iq.PROTOCOL).next(Ping.PROTOCOL), new SimpleObjectParserFactory<Ping>(Ping.PROTOCOL, Ping.class)); chatSystem.registerTranslator( Ping.class, new SimpleObjectTranslatorFactory<Ping>(Ping.class, Ping.PROTOCOL)); chatSystem.registerApi(IPing.class, PingImpl.class, properties); } public void destroy(IChatSystem chatSystem) { chatSystem.unregisterApi(IPing.class); chatSystem.unregisterTranslator(Ping.class); chatSystem.unregisterParser(ProtocolChain.first(Iq.PROTOCOL).next(Ping.PROTOCOL)); } } 插件已经开发完成,现在我们可以注册插件:
chatClient.register(PingPlugin.class); 创建Api:
IPing ping = chatClient.createApi(IPing.class); 并执行协议逻辑:
IPing.Result result = ping.ping(); PingPlugin是一个非常简单的插件,虽然它很简单,但是开发这样一个插件,依然需要完成一个完整的插件开发的过程。
这样一个简单而又完整的插件案例,是我们开发更复杂协议的起点。
如果要开发更复杂的协议,最好的办法就是阅读XMPP文档,以及阅读Chalk的代码。如果你对XMPP和开源充满热情,那现在就开始吧。
TBD
TBD
