阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

MetaQ集群安装测试

169次阅读
没有评论

共计 5153 个字符,预计需要花费 13 分钟才能阅读完成。

1,ZooKeeper 集群安装,可以参考 ZooKeeper 集群环境搭建实践 http://www.linuxidc.com/Linux/2013-04/83562.htm

2,下载 https://github.com/killme2008/Metamorphosis/tree/metamorphosis-all-1.4.6.2,如果不想自己编译可以直接下载 http://fnil.net/downloads/index.html,我这里选择自己编译,主要是以后如果出现问题自己可以修改其源码,重新编译

3,maven 编译,maven 环境自己搜索配置好,下载 all 项目后需要编译其子项目 metamorphosis-server-wrapper。dos 环境进入其目录下 mvn eclipse:eclipse,完成后导入到 eclipse, 用 eclipse 插件编译。或者直接 dos 该目录下执行 mvn clean install -Dmaven.test.skip=true。完成后 target 目录下生产其 jar 包;

可以在工程创建 lib 文件夹,输入以下命令:mvn dependency:copy-dependencies -DoutputDirectory=lib(不加 DoutputDirectory 会默认输出到 targed/dependency 下)。再把 install 的 jar 包也 copy 到 lib 下。

4,完成编译后上传到服务器

需要修改 conf/server.ini 文件

[system]brokerId=2

numPartitions=1

serverPort=8123

ashboardHttpPort=8120

unflushThreshold=0

unflushInterval=10000

maxSegmentSize=1073741824

maxTransferSize=1048576

deletePolicy=delete,168

deleteWhen=0 0 6,18 * * ?

flushTxLogAtCommit=1

stat=true

dataPath=/data1/metaq/data

dataLogPath=/data1/metaq/log

[zookeeper]

zk.zkConnect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181

zk.zkSessionTimeoutMs=30000

zk.zkConnectionTimeoutMs=30000

zk.zkSyncTimeMs=5000

;; Topics section

[topic=test]

[topic=meta-test]

 

 

 

集群的话需要修改上面标红部分,brokerId 保证每个服务器节点上不一样就行

dataPath,dataLogPath 如果自己制定,需要每台服务器 mkdir

分发到个节点,在每台节点的 bin 下都执行 metaServer.sh start

需要停止时执行 metaServer.sh stop

查看状态 sh metaServer.sh status

5,应用例子

package com.test.metaq;

import Java.util.concurrent.Executor;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;

public class AsyncConsum {

 public static void main(String[] args) {
  final MetaClientConfig metaClientConfig = new MetaClientConfig(); 
        final ZKConfig zkConfig = new ZKConfig(); 
        zkConfig.zkConnect = “10.168.140.48:2181”; 
        metaClientConfig.setZkConfig(zkConfig); 
        MessageSessionFactory sessionFactory = null;
  try {
  sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
  } catch (MetaClientException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
  } 
        final String topic = “test”; 
        final String group = “meta-example”; 
        MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group)); 
     
        try {
  consumer.subscribe(topic, 1024 * 1024, new MessageListener() {
      public void recieveMessages(Message message) {
          System.out.println(“Receive message ” + new String(message.getData())); 
      } 
      public Executor getExecutor() { 
          return null; 
      } 
  });
  consumer.completeSubscribe();
  } catch (MetaClientException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
  } 
 }

}

 

 

package com.test.metaq;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;

public class Products {

 public static void main(String[] args) {
  final MetaClientConfig metaClientConfig = new MetaClientConfig();
  final ZKConfig zkConfig = new ZKConfig();
  zkConfig.zkConnect = “10.168.140.48:2181”;
  metaClientConfig.setZkConfig(zkConfig);
 
  MessageSessionFactory sessionFactory = null;
  try {
  sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
  } catch (MetaClientException e) {
  e.printStackTrace();
  }
 
  MessageProducer producer = sessionFactory.createProducer();
  final String topic = “test”;
  producer.publish(topic);
  BufferedReader reader = new BufferedReader(new InputStreamReader(
    System.in));
  String line = “qiujinyong”;
  try {
  while ((line = reader.readLine()) != null) {
    SendResult sendResult = producer.sendMessage(new Message(topic,
      line.getBytes()));
    if (!sendResult.isSuccess()) {
    System.err.println(“Send message failed,error message:”
      + sendResult.getErrorMessage());
    } else {
    System.out.println(“Send message successfully,sent to “
      + sendResult.getPartition());
    }
  }
  } catch (IOException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
  } catch (MetaClientException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
  } catch (InterruptedException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
  }

 }

}

打包 test.jar 后,传服务器上 java -cp test.jar com.test.metaq.Products 命令行输入 message

打包 test.jar 后,传服务器上 java -cp test.jar com.test.metaq.AsyncConsum 命令行会接收到 message

ZooKeeper 的详细介绍 :请点这里
ZooKeeper 的下载地址 :请点这里

相关阅读

ZooKeeper 集群配置 http://www.linuxidc.com/Linux/2013-06/86348.htm

使用 ZooKeeper 实现分布式共享锁 http://www.linuxidc.com/Linux/2013-06/85550.htm

分布式服务框架 ZooKeeper — 管理分布式环境中的数据 http://www.linuxidc.com/Linux/2013-06/85549.htm

ZooKeeper 集群环境搭建实践 http://www.linuxidc.com/Linux/2013-04/83562.htm

ZooKeeper 服务器集群环境配置实测 http://www.linuxidc.com/Linux/2013-04/83559.htm

ZooKeeper 集群安装 http://www.linuxidc.com/Linux/2012-10/72906.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计5153字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中