有点三高,想北京空气实时监测测我的身体数据,有没有什么设备是可以储存历史数据的?

有可以远程监控父母老人的身体状况以应对突发状况的智能设备吗? - 知乎有问题,上知乎。知乎作为中文互联网最大的知识分享平台,以「知识连接一切」为愿景,致力于构建一个人人都可以便捷接入的知识分享网络,让人们便捷地与世界分享知识、经验和见解,发现更大的世界。80被浏览<strong class="NumberBoard-itemValue" title="2分享邀请回答104 条评论分享收藏感谢收起4添加评论分享收藏感谢收起→ 我们家的老人年纪大了人也是很胖的,现在有点三高
我们家的老人年纪大了人也是很胖的,现在有点三高
健康咨询描述:
我们家的老人年纪大了,人也是很胖的,现在有点三高,他这一次体检的时候有尿素氮
想得到怎样的帮助:尿素氮高这个有可能引起哪一方面的病
其他类似问题
医生回复区
爱分享229号
微信扫一扫,随时问医生
&&&&&&尿酸高容易导致痛风,所以饮食注意很关键。 &&&&&&维持标准的体重:若体重过重应慢慢减肥,不宜快速减肥或断食,以免因细胞大量崩解产生尿酸而导致痛风发作,减重以每月减轻1公斤以内为宜。&&&&&&在糖类方面:所有根茎类皆可食用,蔬菜类除干的香菇、紫菜不宜大量食用外,如豆芽、豆苗皆可食用,水果则无禁忌。&&&&&&在蛋白质方面:对含有高嘌呤的食物如内脏、鱼类(海参、海蜇皮除外),宜减少摄取。&&&&&&在油脂方面:由于高脂肪食物会抑制尿酸排泄,在急性痛风发作期避免大量使用。&&&&&&酒精:酒精在体内会代谢为乳影响尿酸排泄,并且本身会加速尿酸的形成,故患者须禁酒,尤其是啤酒最容易导致痛风发作,应绝对禁止。咖啡及茶则无限制,多喝水则可以促进尿酸排泄及预防尿酸路结石&&&&&&
( 专业名称:减肥瘦身 )
平均费用:1661元
肥胖一直是长期困扰年轻人,尤其是年轻女性的热点问题,目前世界上无论是发达国家还是发展中国家,肥胖率正以惊人的速度在增长,肥胖不仅影响形体美,...
手术项目:
减肥亲身体验
参考价格:5
参考价格:236
参考价格:70
您可能关注的问题
用药指导/吃什么药好
本品适用于:
1. 治疗骨关节炎急性期和慢性期...
参考价格:¥66
本品治疗痛风性关节炎的急性发作﹐预防复发性痛风性...
参考价格:¥5.4
您好,虽然我们的工作人员都在竭尽所能的改善网站,让大家能够非常方便的使用网站,但是其中难免有所疏漏,对您造成非常不必要的麻烦。在此,有问必答网向您表示深深的歉意,如果您遇到的麻烦还没有解决,您可以通过以下方式联系我们,我们会优先特殊解决您的问题。
请选择投诉理由
涉嫌广告宣传
无意义提问
非医学类咨询
违背伦理道德
其他投诉理由
涉嫌广告宣传
无意义回复
违背伦理道德
复制粘贴内容
常识性错误
其他投诉理由
如遇紧急情况,请致电400-没账号?一键数据猿账号
已有账号?
已有账号?
大数据快播
不容错过的资讯
大数据学堂
大数据企业推荐
大家都在搜使用Storm实现实时大数据分析!
发表于 16:56|
来源Dr.Dobb's|
作者Shruthi Kumar、Siddharth Patankar
摘要:随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析。CSDN在此编译、整理。
简单和明了,Storm让大数据分析变得轻松加愉快。
当今世界,公司的日常运营经常会生成TB级别的数据。数据来源囊括了互联网装置可以捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中创建的数据。考虑到数据的生成量,实时处理成为了许多机构需要面对的首要挑战。我们经常用的一个非常有效的开源实时计算工具就是 && Twitter开发,通常被比作&实时的Hadoop&。然而Storm远比Hadoop来的简单,因为用它处理大数据不会带来新老技术的交替。
Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分别从事技术分析和研发工作。本文详述了Storm的使用方法,例子中的项目名称为&超速报警系统(Speeding Alert System)&。我们想实现的功能是:实时分析过往车辆的数据,一旦车辆数据超过预设的临界值 && 便触发一个trigger并把相关的数据存入数据库。
对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。同Hadoop一样Storm也可以处理大批量的数据,然而Storm在保证高可靠性的前提下还可以让处理进行的更加实时;也就是说,所有的信息都会被处理。Storm同样还具备容错和分布计算这些特性,这就让Storm可以扩展到不同的机器上进行大批量的数据处理。他同样还有以下的这些特性:
易于扩展。对于扩展,你只需要添加机器和改变对应的topology(拓扑)设置。Storm使用Hadoop Zookeeper进行集群协调,这样可以充分的保证大型集群的良好运行。
每条信息的处理都可以得到保证。
Storm集群管理简易。
Storm的容错机能:一旦topology递交,Storm会一直运行它直到topology被废除或者被关闭。而在执行中出现错误时,也会由Storm重新分配任务。
尽管通常使用Java,Storm中的topology可以用任何语言设计。
当然为了更好的理解文章,你首先需要安装和设置Storm。需要通过以下几个简单的步骤:
从Storm官方下载
将bin/directory解压到你的PATH上,并保证bin/storm脚本是可执行的。
Storm集群主要由一个主节点和一群工作节点(worker node)组成,通过 Zookeeper进行协调。
主节点通常运行一个后台程序 && Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似于Hadoop中的Job Tracker。
工作节点:
工作节点同样会运行一个后台程序 && Supervisor,用于收听工作指派并基于要求运行工作进程。每个工作节点都是topology中一个子集的实现。而Nimbus和Supervisor之间的协调则通过Zookeeper系统或者集群。
Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的&topology&。topology则是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接的图。下面对出现的术语进行更深刻的解析。
简而言之,Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。而Spout中最主要的方法就是nextTuple(),该方法会发射一个新的tuple到topology,如果没有新tuple发射则会简单的返回。
Topology中所有的处理都由Bolt完成。Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个Bolt进行处理。而Bolt中最重要的方法是execute(),以新的tuple作为参数接收。不管是Spout还是Bolt,如果将tuple发射成多个流,这些流都可以通过declareStream()来声明。
Stream Groupings:
Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有的6个Stream Grouping类型:
1. 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。
2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据&user-id&字段,相同&user-id&的元组总是分发到同一个任务,不同&user-id&的元组可能分发到不同的任务。
3. 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。
4. 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。
5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
6. 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。
当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。
当下情况我们需要给Spout和Bolt设计一种能够处理大量数据(日志文件)的topology,当一个特定数据值超过预设的临界值时促发警报。使用Storm的topology,逐行读入日志文件并且监视输入数据。在Storm组件方面,Spout负责读入输入数据。它不仅从现有的文件中读入数据,同时还监视着新文件。文件一旦被修改Spout会读入新的版本并且覆盖之前的tuple(可以被Bolt读入的格式),将tuple发射给Bolt进行临界分析,这样就可以发现所有可能超临界的记录。
下一节将对用例进行详细介绍。
这一节,将主要聚焦于临界值的两种分析类型:瞬间临界(instant thershold)和时间序列临界(time series threshold)。
瞬间临界值监测:一个字段的值在那个瞬间超过了预设的临界值,如果条件符合的话则触发一个trigger。举个例子当车辆超越80公里每小时,则触发trigger。
时间序列临界监测:字段的值在一个给定的时间段内超过了预设的临界值,如果条件符合则触发一个触发器。比如:在5分钟类,时速超过80KM两次及以上的车辆。
Listing One显示了我们将使用的一个类型日志,其中包含的车辆数据信息有:车牌号、车辆行驶的速度以及数据获取的位置。
North city
South city
South city
East &city
South city
West& city
这里将创建一个对应的XML文件,这将包含引入数据的模式。这个XML将用于日志文件的解析。XML的设计模式和对应的说明请见下表。
XML文件和日志文件都存放在Spout可以随时监测的目录下,用以关注文件的实时更新。而这个用例中的topology请见下图。
Figure 1:Storm中建立的topology,用以实现数据实时处理
如图所示:FilelistenerSpout接收输入日志并进行逐行的读入,接着将数据发射给ThresoldCalculatorBolt进行更深一步的临界值处理。一旦处理完成,被计算行的数据将发送给DBWriterBolt,然后由DBWriterBolt存入给数据库。下面将对这个过程的实现进行详细的解析。
Spout的实现
Spout以日志文件和XML描述文件作为接收对象。XML文件包含了与日志一致的设计模式。不妨设想一下一个示例日志文件,包含了车辆的车牌号、行驶速度、以及数据的捕获位置。(看下图)
Figure2:数据从日志文件到Spout的流程图
Listing Two显示了tuple对应的XML,其中指定了字段、将日志文件切割成字段的定界符以及字段的类型。XML文件以及数据都被保存到Spout指定的路径。
Listing Two:用以描述日志文件的XML文件。
&&&vehicle_number&string&&&speed&int&&&&location&string&&&,&& &
通过构造函数及它的参数Directory、PathSpout和TupleInfo对象创建Spout对象。TupleInfo储存了日志文件的字段、定界符、字段的类型这些很必要的信息。这个对象通过序列化XML时建立。
Spout的实现步骤:
对文件的改变进行分开的监听,并监视目录下有无新日志文件添加。
在数据得到了字段的说明后,将其转换成tuple。
声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。
Spout的具体编码在Listing Three中显示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。
public&void&open(&Map&conf,&TopologyContext&context,SpoutOutputCollector&collector&)& &{& &&&&&&&&&&&&_collector&=&& &&&&&&&&&&try& &&&&&&&&&&{& &&&&&&&&&&fileReader&&=&&new&BufferedReader(new&FileReader(new&File(file))); &&&&&&&&&&} &&&&&&&&&&catch&(FileNotFoundException&e) &&&&&&&&&&{ &&&&&&&&&&System.exit(1);& &&&&&&&&&&} &}&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& &&public&void&nextTuple() &{ &&&&&&&&&&protected&void&ListenFile(File&file) &&&&&&&&&&{ &&&&&&&&&&Utils.sleep(2000); &&&&&&&&&&RandomAccessFile&access&=&null; &&&&&&&&&&String&line&=&null;& &&&&&&&&&&&&&try& &&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&while&((line&=&access.readLine())&!=&null) &&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&if&(line&!=null) &&&&&&&&&&&&&&&&&&&&&{& &&&&&&&&&&&&&&&&&&&&&&&&&&String[]&fields=null; &&&&&&&&&&&&&&&&&&&&&&&&&&&if&(tupleInfo.getDelimiter().equals(&|&))&&fields&=&line.split(&\\&+tupleInfo.getDelimiter());& &&&&&&&&&&&&&&&&&&&&&&&&&&&else& &&&&&&&&&&&&&&&&&&&&&&&&&&&fields&=&line.split&&(tupleInfo.getDelimiter());& &&&&&&&&&&&&&&&&&&&&&&&&&&&if&(tupleInfo.getFieldList().size()&==&fields.length)&&_collector.emit(new&Values(fields)); &&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&} &&&&&&&&&&&&&} &&&&&&&&&&&&&catch&(IOException&ex){&} &&&&&&&&&&&&&} &} &&public&void&declareOutputFields(OutputFieldsDeclarer&declarer) &{ &&&&&&&String[]&fieldsArr&=&new&String&[tupleInfo.getFieldList().size()]; &&&&&&&for(int&i=0;&i&tupleInfo.getFieldList().size();&i++) &&&&&&&{ &&&&&&&&&&&&&&&fieldsArr[i]&=&tupleInfo.getFieldList().get(i).getColumnName(); &&&&&&&} &declarer.declare(new&Fields(fieldsArr)); &}&&&& &
declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就可以用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有添加Spout就会进行读入并且发送给Bolt进行处理。
Bolt的实现
Spout的输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如Figure 3中的两个Bolt。
Figure 3:Spout到Bolt的数据流程。
ThresholdCalculatorBolt
Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:
临界值检查
临界值栏数检查(拆分成字段的数目)
临界值数据类型(拆分后字段的类型)
临界值出现的频数
临界值时间段检查
Listing Four中的类,定义用来保存这些值。
Listing Four:ThresholdInfo类
public&class&ThresholdInfo&implementsSerializable &&{&& &&&&&&&&&private&String&& &&&&&&&&&private&String&& &&&&&&&&&private&Object&thresholdV &&&&&&&&&private&int&thresholdColN& &&&&&&&&&private&Integer&timeW& &&&&&&&&&private&int&frequencyOfO& &}& &
基于字段中提供的值,临界值检查将被Listing Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。
Listing Five:临界值检测代码段
public&void&execute(Tuple&tuple,&BasicOutputCollector&collector)& &{ &&&&&if(tuple!=null)& &&&&&{ &&&&&&&&&List&Object&&inputTupleList&=&(List&Object&)&tuple.getValues(); &&&&&&&&&int&thresholdColNum&=&thresholdInfo.getThresholdColNumber();& &&&&&&&&&Object&thresholdValue&=&thresholdInfo.getThresholdValue();& &&&&&&&&&String&thresholdDataType&=&tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();& &&&&&&&&&Integer&timeWindow&=&thresholdInfo.getTimeWindow(); &&&&&&&&&&int&frequency&=&thresholdInfo.getFrequencyOfOccurence(); &&&&&&&&&&if(thresholdDataType.equalsIgnoreCase(&string&)) &&&&&&&&&&{ &&&&&&&&&&&&&&String&valueToCheck&=&inputTupleList.get(thresholdColNum-1).toString(); &&&&&&&&&&&&&&String&frequencyChkOp&=&thresholdInfo.getAction(); &&&&&&&&&&&&&&if(timeWindow!=null) &&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&long&curTime&=&System.currentTimeMillis(); &&&&&&&&&&&&&&&&&&long&diffInMinutes&=&(curTime-startTime)/(1000); &&&&&&&&&&&&&&&&&&if(diffInMinutes&=timeWindow) &&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&if(frequencyChkOp.equals(&==&)) &&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) &&&&&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&count.incrementAndGet(); &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&if(count.get()&&&frequency) &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&splitAndEmit(inputTupleList,collector); &&&&&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&else&if(frequencyChkOp.equals(&!=&)) &&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) &&&&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&count.incrementAndGet(); &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&if(count.get()&&&frequency) &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&splitAndEmit(inputTupleList,collector); &&&&&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&&else&&&&&&&&&&&&&&&&&&&&&&&&&System.out.println(&Operator&not&supported&);& &&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&else&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&if(frequencyChkOp.equals(&==&)) &&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) &&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&count.incrementAndGet(); &&&&&&&&&&&&&&&&&&&&&&&&&&&if(count.get()&&&frequency) &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&splitAndEmit(inputTupleList,collector); &&&&&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&else&if(frequencyChkOp.equals(&!=&)) &&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) &&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&&count.incrementAndGet(); &&&&&&&&&&&&&&&&&&&&&&&&&&&&if(count.get()&&&frequency) &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&splitAndEmit(inputTupleList,collector); &&&&&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&} &&&&&&&&&&&&&} &&&&&&&&&&&&&else&if(thresholdDataType.equalsIgnoreCase(&int&)&||&&&&&&&&&&&&&&&&&&&&&thresholdDataType.equalsIgnoreCase(&double&)&||&&&&&&&&&&&&&&&&&&&&&thresholdDataType.equalsIgnoreCase(&float&)&||&&&&&&&&&&&&&&&&&&&&&thresholdDataType.equalsIgnoreCase(&long&)&||&&&&&&&&&&&&&&&&&&&&&thresholdDataType.equalsIgnoreCase(&short&)) &&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&String&frequencyChkOp&=&thresholdInfo.getAction(); &&&&&&&&&&&&&&&&&if(timeWindow!=null) &&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&long&valueToCheck&=&&&&&&&&&&&&&&&&&&&&&&&&&&Long.parseLong(inputTupleList.get(thresholdColNum-1).toString()); &&&&&&&&&&&&&&&&&&&&&&long&curTime&=&System.currentTimeMillis(); &&&&&&&&&&&&&&&&&&&&&&long&diffInMinutes&=&(curTime-startTime)/(1000); &&&&&&&&&&&&&&&&&&&&&&System.out.println(&Difference&in&minutes=&+diffInMinutes); &&&&&&&&&&&&&&&&&&&&&&if(diffInMinutes&=timeWindow) &&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&if(frequencyChkOp.equals(&&&)) &&&&&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&if(valueToCheck&&&Double.parseDouble(thresholdValue.toString())) &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&count.incrementAndGet(); &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&if(count.get()&&&frequency) &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&splitAndEmit(inputTupleList,collector); &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&&&&&&else&if(frequencyChkOp.equals(&&&)) &&&&&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&if(valueToCheck&&&Double.parseDouble(thresholdValue.toString())) &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&count.incrementAndGet(); &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&if(count.get()&&&frequency) &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&splitAndEmit(inputTupleList,collector); &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&&&&&&&else&if(frequencyChkOp.equals(&==&)) &&&&&&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&if(valueToCheck&==&Double.parseDouble(thresholdValue.toString())) &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&count.incrementAndGet(); &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&if(count.get()&&&frequency) &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&splitAndEmit(inputTupleList,collector); &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&&&&&&&else&if(frequencyChkOp.equals(&!=&)) &&&&&&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&.&.&. &&&&&&&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&} &&&&&&&else&&&&&&&&&&&splitAndEmit(null,collector); &&&&&&&} &&&&&&&else&&&&&&{ &&&&&&&&&&&&System.err.println(&Emitting&null&in&bolt&); &&&&&&&&&&&&splitAndEmit(null,collector); &&&&&} &}&
经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在我们的用例中是DBWriterBolt。
DBWriterBolt
经过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt做了这个持久化的工作并把tuple存入了数据库。表的建立由prepare()函数完成,这也将是topology调用的第一个方法。方法的编码如Listing Six所示。
Listing Six:建表编码。
public&void&prepare(&Map&StormConf,&TopologyContext&context&)& &{&&&&&&& &&&&&try& &&&&&{ &&&&&&&&&Class.forName(dbClass); &&&&&}& &&&&&catch&(ClassNotFoundException&e)& &&&&&{ &&&&&&&&&System.out.println(&Driver&not&found&); &&&&&&&&&e.printStackTrace(); &&&&&} && &&&&&try& &&&&&{ &&&&&&&&connection&driverManager.getConnection(& &&&&&&&&&&&&&jdbc:mysql://&+databaseIP+&:&+databasePort+&/&+databaseName,&userName,&pwd); &&&&&&&&connection.prepareStatement(&DROP&TABLE&IF&EXISTS&&+tableName).execute(); && &&&&&&&&StringBuilder&createQuery&=&new&StringBuilder( &&&&&&&&&&&&&CREATE&TABLE&IF&NOT&EXISTS&&+tableName+&(&); &&&&&&&&for(Field&fields&:&tupleInfo.getFieldList()) &&&&&&&&{ &&&&&&&&&&&&if(fields.getColumnType().equalsIgnoreCase(&String&)) &&&&&&&&&&&&&&&&createQuery.append(fields.getColumnName()+&&VARCHAR(500),&); &&&&&&&&&&&&else&&&&&&&&&&&&&&&&createQuery.append(fields.getColumnName()+&&&+fields.getColumnType()+&,&); &&&&&&&&} &&&&&&&&createQuery.append(&thresholdTimeStamp&timestamp)&); &&&&&&&&connection.prepareStatement(createQuery.toString()).execute(); && &&&&&&&&&&&&&&&&StringBuilder&insertQuery&=&new&StringBuilder(&INSERT&INTO&&+tableName+&(&); &&&&&&&&String&tempCreateQuery&=&new&String(); &&&&&&&&for(Field&fields&:&tupleInfo.getFieldList()) &&&&&&&&{ &&&&&&&&&&&&&insertQuery.append(fields.getColumnName()+&,&); &&&&&&&&} &&&&&&&&insertQuery.append(&thresholdTimeStamp&).append(&)&values&(&); &&&&&&&&for(Field&fields&:&tupleInfo.getFieldList()) &&&&&&&&{ &&&&&&&&&&&&insertQuery.append(&?,&); &&&&&&&&} && &&&&&&&&insertQuery.append(&?)&); &&&&&&&&prepStatement&=&connection.prepareStatement(insertQuery.toString()); &&&&&} &&&&&catch&(SQLException&e)& &&&&&{&&&&&&& &&&&&&&&&e.printStackTrace(); &&&&&}&&&&&&& &} &
数据分批次的插入数据库。插入的逻辑由Listting Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不同类型输入的解析。
Listing Seven:数据插入的代码部分。
public&void&execute(Tuple&tuple,&BasicOutputCollector&collector)& &{ &&&&&batchExecuted=false; &&&&&if(tuple!=null) &&&&&{ &&&&&&&&List&#60;Object&#62;&inputTupleList&=&(List&#60;Object&#62;)&tuple.getValues(); &&&&&&&&int&dbIndex=0; &&&&&&&&for(int&i=0;i&#60;tupleInfo.getFieldList().size();i++) &&&&&&&&{ &&&&&&&&&&&&Field&field&=&tupleInfo.getFieldList().get(i); &&&&&&&&&&&&try&{ &&&&&&&&&&&&&&&&dbIndex&=&i+1; &&&&&&&&&&&&&&&&if(field.getColumnType().equalsIgnoreCase(&String&))&&&&&&&&&&&&& &&&&&&&&&&&&&&&&&&&&prepStatement.setString(dbIndex,&inputTupleList.get(i).toString()); &&&&&&&&&&&&&&&&else&if(field.getColumnType().equalsIgnoreCase(&int&)) &&&&&&&&&&&&&&&&&&&&prepStatement.setInt(dbIndex, &&&&&&&&&&&&&&&&&&&&&&&&Integer.parseInt(inputTupleList.get(i).toString())); &&&&&&&&&&&&&&&&else&if(field.getColumnType().equalsIgnoreCase(&long&)) &&&&&&&&&&&&&&&&&&&&prepStatement.setLong(dbIndex,& &&&&&&&&&&&&&&&&&&&&&&&&Long.parseLong(inputTupleList.get(i).toString())); &&&&&&&&&&&&&&&&else&if(field.getColumnType().equalsIgnoreCase(&float&)) &&&&&&&&&&&&&&&&&&&&prepStatement.setFloat(dbIndex,& &&&&&&&&&&&&&&&&&&&&&&&&Float.parseFloat(inputTupleList.get(i).toString())); &&&&&&&&&&&&&&&&else&if(field.getColumnType().equalsIgnoreCase(&double&)) &&&&&&&&&&&&&&&&&&&&prepStatement.setDouble(dbIndex,& &&&&&&&&&&&&&&&&&&&&&&&&Double.parseDouble(inputTupleList.get(i).toString())); &&&&&&&&&&&&&&&&else&if(field.getColumnType().equalsIgnoreCase(&short&)) &&&&&&&&&&&&&&&&&&&&prepStatement.setShort(dbIndex,& &&&&&&&&&&&&&&&&&&&&&&&&Short.parseShort(inputTupleList.get(i).toString())); &&&&&&&&&&&&&&&&else&if(field.getColumnType().equalsIgnoreCase(&boolean&)) &&&&&&&&&&&&&&&&&&&&prepStatement.setBoolean(dbIndex,& &&&&&&&&&&&&&&&&&&&&&&&&Boolean.parseBoolean(inputTupleList.get(i).toString())); &&&&&&&&&&&&&&&&else&if(field.getColumnType().equalsIgnoreCase(&byte&)) &&&&&&&&&&&&&&&&&&&&prepStatement.setByte(dbIndex,& &&&&&&&&&&&&&&&&&&&&&&&&Byte.parseByte(inputTupleList.get(i).toString())); &&&&&&&&&&&&&&&&else&if(field.getColumnType().equalsIgnoreCase(&Date&)) &&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&Date&dateToAdd=null; &&&&&&&&&&&&&&&&&&&if&(!(inputTupleList.get(i)&instanceof&Date))&& &&&&&&&&&&&&&&&&&&&{&& &&&&&&&&&&&&&&&&&&&&&&&&DateFormat&df&=&new&SimpleDateFormat(&yyyy-MM-dd&hh:mm:ss&); &&&&&&&&&&&&&&&&&&&&&&&&try& &&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&&dateToAdd&=&df.parse(inputTupleList.get(i).toString()); &&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&&&&&catch&(ParseException&e)& &&&&&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&&&&&&&&&&&&&&&&System.err.println(&Data&type&not&valid&); &&&&&&&&&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&&&&&&&&}&& &&&&&&&&&&&&&&&&&&&&else&&&&&&&&&&&&&&&&&&&&{ &&&&&&&&&&&&&dateToAdd&=&(Date)inputTupleList.get(i); &&&&&&&&&&&&&java.sql.Date&sqlDate&=&new&java.sql.Date(dateToAdd.getTime()); &&&&&&&&&&&&&prepStatement.setDate(dbIndex,&sqlDate); &&&&&&&&&&&&&}&&& &&&&&&&&&&&&&}& &&&&&&&&&catch&(SQLException&e)& &&&&&&&&&{ &&&&&&&&&&&&&&e.printStackTrace(); &&&&&&&&&} &&&&&} &&&&&Date&now&=&new&Date();&&&&&&&&&& &&&&&try&&&&&{ &&&&&&&&&prepStatement.setTimestamp(dbIndex+1,&new&java.sql.Timestamp(now.getTime())); &&&&&&&&&prepStatement.addBatch(); &&&&&&&&&counter.incrementAndGet(); &&&&&&&&&if&(counter.get()==&batchSize)& &&&&&&&&&executeBatch(); &&&&&}& &&&&&catch&(SQLException&e1)& &&&&&{ &&&&&&&&&e1.printStackTrace(); &&&&&}&&&&&&&&&&& &&&&} &&&&else&&&&{ &&&&&&&&&long&curTime&=&System.currentTimeMillis(); &&&&&&&&long&diffInSeconds&=&(curTime-startTime)/(60*1000); &&&&&&&&if(counter.get()&#60;batchSize&&&&diffInSeconds&#62;batchTimeWindowInSeconds) &&&&&&&&{ &&&&&&&&&&&&&try&{ &&&&&&&&&&&&&&&&&executeBatch(); &&&&&&&&&&&&&&&&&startTime&=&System.currentTimeMillis(); &&&&&&&&&&&&&} &&&&&&&&&&&&&catch&(SQLException&e)&{ &&&&&&&&&&&&&&&&&&e.printStackTrace(); &&&&&&&&&&&&&} &&&&&&&&} &&&&} &} && &public&void&executeBatch()&throws&SQLException &{ &&&&&batchExecuted=true; &&&&&prepStatement.executeBatch(); &&&&&counter&=&new&AtomicInteger(0); &}&
一旦Spout和Bolt准备就绪(等待被执行),topology生成器将会建立topology并准备执行。下面就来看一下执行步骤。
在本地集群上运行和测试topology
通过TopologyBuilder建立topology。
使用Storm Submitter,将topology递交给集群。以topology的名字、配置和topology的对象作为参数。
提交topology。
Listing Eight:建立和执行topology。
public&class&StormMain &{ &&&&&&public&static&void&main(String[]&args)&throws&AlreadyAliveException,& &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&InvalidTopologyException,& &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&InterruptedException& &&&&&&{ &&&&&&&&&&&ParallelFileSpout&parallelFileSpout&=&new&ParallelFileSpout(); &&&&&&&&&&&ThresholdBolt&thresholdBolt&=&new&ThresholdBolt(); &&&&&&&&&&&DBWriterBolt&dbWriterBolt&=&new&DBWriterBolt(); &&&&&&&&&&&TopologyBuilder&builder&=&new&TopologyBuilder(); &&&&&&&&&&&builder.setSpout(&spout&,&parallelFileSpout,&1); &&&&&&&&&&&builder.setBolt(&thresholdBolt&,&thresholdBolt,1).shuffleGrouping(&spout&); &&&&&&&&&&&builder.setBolt(&dbWriterBolt&,dbWriterBolt,1).shuffleGrouping(&thresholdBolt&); &&&&&&&&&&&if(this.argsMain!=null&&&&this.argsMain.length&&#62;&0)& &&&&&&&&&&&{ &&&&&&&&&&&&&&&conf.setNumWorkers(1); &&&&&&&&&&&&&&&StormSubmitter.submitTopology(& &&&&&&&&&&&&&&&&&&&&this.argsMain[0],&conf,&builder.createTopology()); &&&&&&&&&&&} &&&&&&&&&&&else&&&&&&&&&&&{&&&& &&&&&&&&&&&&&&&Config&conf&=&new&Config(); &&&&&&&&&&&&&&&conf.setDebug(true); &&&&&&&&&&&&&&&conf.setMaxTaskParallelism(3); &&&&&&&&&&&&&&&LocalCluster&cluster&=&new&LocalCluster(); &&&&&&&&&&&&&&&cluster.submitTopology( &&&&&&&&&&&&&&&&Threshold_Test&,&conf,&builder.createTopology()); &&&&&&&&&&&} &&&&&&} &}&
topology被建立后将被提交到本地集群。一旦topology被提交,除非被取缔或者集群关闭,它将一直保持运行不需要做任何的修改。这也是Storm的另一大特色之一。
这个简单的例子体现了当你掌握了topology、spout和bolt的概念,将可以轻松的使用Storm进行实时处理。如果你既想处理大数据又不想遍历Hadoop的话,不难发现使用Storm将是个很好的选择。
原文链接:&(编译/仲浩 王旭东/审校)
欢迎关注微博,了解更多云信息。
本文为CSDN编译整理,未经允许不得转载。如需转载请联系
推荐阅读相关主题:
CSDN官方微信
扫描二维码,向CSDN吐槽
微信号:CSDNnews
相关热门文章}

我要回帖

更多关于 北京空气实时监测 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信