序:Sawzall的论文早在2006年就发表了,后来Google又推出了Tenzing,Dremel等数据分析系统,到了2010年就把Sawzall给开源了,项目主页:
http://code.google.com/p/szl/。与Tenzing,Dremel相比, Sawzall所能做的事情还是比较有限,但是作为一种DSL,毕竟还是要比直接写MapReduce job要更易用些。本文就简单描述下其原理使用及扩展方法,转载请注明:
作者:phylips@bmy 2012-7-24
1. Szl使用
1.1 简介
安装完szl后,用户就可以使用szl命令了,szl是一个可执行程序,可以用来在本地执行sawzall程序对输入数据进行处理。可以以文本文件为输入,也可以以recordio格式为输入。输入数据可以是简单的以分隔符分割的记录,也可以是保存在recordio中的proto序列化数据。szl的工作模式就像经典的awk,用户可以用它来对数据进行统计分析。
参考src/app/szl.cc的实现,用户可以实现符合自己使用场景的脚本执行框架,对自己的数据进行处理,比如可以将其移植到自己的MapReduce环境中。
1.2 实例
1.2.1 文本格式输入处理
输入数据:
1,2
2,2
2,3
2,4
3,5
3,6
Szl程序:
t: table set(100)[int] of int;
fields: array of bytes = splitcsvline(input);
index: int = int(string(fields[0]),10);
value: int = int(string(fields[1]),10);
emit t[index] <- value;
命令:
szl b.szl data -table_output t
1.2.2 protobuf格式输入处理
1.2.2.1 从proto生成szl类型定义
执行命令:protoc --plugin=/home/.../szl-read-only/src/protoc-gen-szl --szl_out=. page_meta.proto
出现错误:/home/.../szl-read-only/src/protoc-gen-szl: error while loading shared libraries: libicui18n.so.48: cannot open shared object file: No such file or directory
解决方案:安装icu,sudo yum install icu
设置LD_LIBRARY_PATH重新运行如下命令:
LD_LIBRARY_PATH=/usr/local/lib protoc --plugin=/home/duanple/peile.duan/szl-read-only/src/protoc-gen-szl --szl_out=. page_meta.proto
可以看到在当前目录下有一个page_meta.szl的输出文件
通过如上命令可以将proto定义直接转换成szl类型定义。当然实际中,这个过程是被szl隐藏的,szl自己会在内部调用protoc和protoc-gen-szl,完成这个转换。通过szl的help信息可以看到,它有如下两个选项用来设置这两个工具的路径:-protocol_compiler和-protocol_compiler_plugin。
在生成proto对应的szl类型定义后,用户可以在其szl程序中直接include,来取代proto,如下所示。
1.2.2.2 Proto格式输入
Proto定义
message PageMeta {
optional bytes UrlHash = 1;
/** meta fields from crawler. */
optional bytes RawUrl = 2; //also for piece_data
optional bytes FinalUrl = 3;
}
protoc page_meta.proto --cpp_out=./
输入数据生成
为了能够进行实验,首先我们需要一组输入数据,在这里我们利用utilities/recordio.cc中的RecordWriter类写入proto记录。通过这个实验,再结合src/app/szl.cc我们就可以比较清楚的了解到szl的整个执行原理。
#include "page_meta.pb.h"
using namespace std;
#include "src/public/recordio.h"
using namespace sawzall;
int main()
{
PageMeta meta;
meta.set_rawurl("http://www.baidu.com");
string outputFile = "data";
string outputRecord = meta.SerializeAsString();
RecordWriter* writer = RecordWriter::Open(outputFile.c_str());
writer->Write(outputRecord.c_str(), outputRecord.size());
writer->Write(outputRecord.c_str(), outputRecord.size());
writer->Write(outputRecord.c_str(), outputRecord.size());
delete writer;
return 0;
}
g++ proto_writer.cpp page_meta.pb.cc -lprotobuf -lpthread –lszl
LD_LIBRARY_PATH=/usr/local/lib ./a.out
这样就会在当前文件夹下生成一个data文件
Szl程序
统计Record个数
proto "page_meta.proto"
meta: PageMeta = input;
count: table sum of int;
emit count <- 1;
命令
LD_LIBRARY_PATH=/usr/local/lib szl proto.szl --protocol_compiler=/usr/bin/protoc --protocol_compiler_plugin==/home/.../szl-read-only/src/protoc-gen-szl --table_output=count -use_recordio data
执行结果
count[] = 3
Szl程序
统计RawUrl字段总长度
proto "page_meta.proto"
meta: PageMeta = input;
count: table sum of int;
emit count <- len(string(meta.rawurl));
执行结果
count[] = 60
Szl程序
使用include取代proto语句
include "page_meta.szl"
meta: PageMeta = input;
count: table sum of int;
emit count <- len(string(meta.rawurl));
执行结果
count[] = 60
2. Sawzall框架原理
Sawzall的核心概念有两个:记录处理,聚合。即一次处理一条记录,将结果进行聚合。
2.1 核心类
SzlEmitter,EmitterFactory,SzlTabWriter,SzlTabEntry,SzlResults,Process
Process 以sawzall源程序为输入,通过Process::set_emitter_factory(DemoEmitterFactory*)和sawzall::RegisterEmitters(&process)设置好emitters,然后通过process.Run(input[i].data(), input[i].size(), NULL, 0)来对每条输入数据进行处理。处理结果需要通过SzlEmitter. Flusher()写出。
在这里最核心的是Emitter,Emitter负责响应对table的”<-“操作,在src/public/emitterinterface.h,我们可以看到其接口定义,注释已经写的比较清楚,可以将其看做是针对”emit”语句的一系列响应动作,针对每个元素,会有三个过程:
1) Begin (<appropriate compound type>, <length>)
2) PutX
3) End(<appropriate compound type>, <length>)
一个emit语句会包含如下几个组成部分:Index,Element,Weight。比如如下语句将会产生如下对应调用序列:
// emit table[1] <- { "foo": 1, "bar": 0 };
// Begin(EMIT, 1)
// Begin(INDEX, 1) PutInt(1) End(INDEX, 1)
// Begin(ELEMENT, 1)
// Begin(MAP, 2)
// PutString("foo", 3) PutInt(1)
// PutString("bar", 3) PutInt(0)
// End(MAP, 2)
// End(ELEMENT, 1)
// End(EMIT, 1)
参考下src/emitvalues/szlemitter.cc实现,除了继承自Emitter的接口外,SzlEmitter新增了Merge,DisplayResults,Flusher,Clear等几个新接口。此外还包含了一些重要的变量:SzlTabWriter* writer,SzlTabEntryMap* table_等。也就是说它除了负责对emit进行响应外,同时还会在内存中保存emit操作后的结果,以及将这些数据写出。
SzlTabEntryMap实际上是hash_map<string, SzlTabEntry*>。SzlTabEntry实际上就是table内的一个value, SzlEmitter内部会调用SzlTabWriter来创建和修改SzlTabEntry,而一个SzlTabEntry也可能是一个复合结构,同时它往往与SzlTabWriter是成对出现的,实际上真正的聚合器逻辑就是通过SzlTabEntry来实现的,src/emitters/目录下就包含了一系列常用的聚合器实现。
再进一步的看,sawzall从外部看来可以支持很多table类型,table本身也可以是多维的,每维的数据类型也是多样的,通过分析SzlEmitter::Begin,End,PutX,可知实际上在内部它会通过encoder将多个维度上的index值组合成一个key,也就是说内部看到的只有一个key,这一点通过SzlTabEntryMap也可以看出。
2.2 模块结构
Szl的src目录下有如下一些文件夹:
app:包含szl本地化工具的相关实现,app/tests/目录下还包含了一个MapReduce的实现实例
contrib:内含Emacs的一个插件
emitters:一系列聚合器实现
emitvalues:sawzall相关一些基本类型定义,包括decoder,emitter,encoder,tableentry等
engine:sawzall语言相关,包括词法分析,语法分析,执行引擎
fmt:格式化输出相关
intrinsics:一些常见的运算支持
protoc_plugin:protobuf相关工具
public:公开的头文件
utilities:基本工具类
3. 将Sawzall应用于MapReduce环境
3.1 mapreduce_demo_unittest.cc
位于src/app/tests/mapreduce_demo_unittest.cc,对于一个MapReduce程序来说,整个处理过程涉及到如下几个对象:sawzall源程序,输入数据,MapReduce框架,驱动程序。
对于整个数据处理过程来说,sawzall源程序和输入数据都是输入,驱动程序负责读取数据和sawzall源程序,将sawzall源程序进行动态的编译,然后将该编译后程序针对每条输入记录执行一遍,table的数据是sawzall源程序的输出窗口,每条记录执行时都会去修改table的数据,该table会始终保留这些修改。而table的内容会在调用SzlEmitter::Flusher()时被写出去,该函数会通过调用SzlEmitter::WriteValue(const string& key, const string& value)将SzlTabEntryMap中的所有key,value写出去,因此用户可以通过实现自己的WriteValue函数就可以控制table数据如何写出。
Table中的数据如果是聚合器类型,需要支持Merge操作,相互会进行Merge。
3.2 执行过程
首先来考虑下如果将sawzall应用到MapReduce环境中,大概应该怎么实现。首先需要sawzall语言的支持,这个通过Process类可以实现,然后用户程序只需要读出一条条的记录,然后将它交给Process,Process会通过调用Run 处理每条记录,所以Process就像一个执行环境,首先它需要负责保存好记录执行后的table数据,同时它还要能够将table数据传给用户程序,比如随着处理的进行,table中的key,value个数逐渐增多,因此用户就需要将数据Flush出去,同时清空table,然后继续处理避免内存耗尽。写出去的数据是key,value的形式,同时这些key,value数据还要能够进行排序,reducer会重新将他们读取出来,同时需要将相同key下的进行reduce,对于reduce后的结果进行显示或保存。
在这个过程中可能会有如下问题:如何将SzlEmitter与table关联?如何得到sawzall执行后的table数据?如何对Table中数据进行序列化反序列化?
首先Map端负责读取数据和sawzall程序的解析,对于sawzall脚本中的每个table,在整个过程中应该有一个SzlEmitter对象实例与之相对应,在table的定义处,就可以根据table名称,创建出该SzlEmitter对象。然后每读出一条记录,就将它喂给sawzall执行引擎,该引擎内部会执行sawzall的处理逻辑,这些逻辑底层会调用SzlEmitter的相关函数,并更新其内部数据。那么如何根据table定义,创建出对应的SzlEmitter对象呢?Process有个函数set_emitter_factory,可以设置SzlEmitterFactory,而SzlEmitterFactory有个函数NewEmitter,该函数可以根据TableInfo,调用SzlTabWriter::CreateSzlTabWriter创建出对应的SzlTabWriter,然后再以该SzlTabWriter为参数创建出SzlEmitter。这样创建出的SzlEmitter中的SzlTabWriter就是针对给定的table的了。
要了解sawzall中的table是如何与SzlTabWriter关联上的就需要查看CreateSzlTabWriter的实现了,具体在src/emitvalues/szltabentry.cc,可以看到它是通过一个全局静态变量creators来根据table名称比如”sum”找到对应的SzlTabWriter,而另一方面我们看src/emitters/szlmaximum.cc,里面有REGISTER_SZL_TAB_WRITER(maximum, SzlMaximum);该宏会将相应SzlTabWriter的构造函数与其名称相关联。
Map端可以通过SzlEmitter::GetMemoryUsage得到当前内存使用情况,并判断是否调用SzlEmitter::Flusher。但是对于不同的table可能需要选择不同的处理方式,比如如果只是”stdout <- “,就需要每次执行都做输出,但是如果是”sum <-”只需要最后输出即可,内部如何对这两种情况进行控制呢?当然stdout这个比较特殊,它可能本身并不属于一个table类型,不会将数据存入内存。但是对于sum和collection,这种区别可能就比较明显了,sum只是要一个最终的结果,collection则需要收集所有见过的数据,因此flush的频率也就是不同的。当多个table出现时,如何控制flush的频率可能是个需要权衡的地方。当然table内数据何时写出以及如何写出,都是由外部框架负责的,跟sawzall已经没有了关系。
3.3 需要做的工作
综上分析,我们总结下,将sawzall用于MapReduce环境,大概需要做这样几个工作:
· 实现一个Emiter,重写WriteValue方法
· 由于一个sawzall程序可能会用到多个table,因此在序列化时,需要在key,value中包含table信息,这样才能在读出时识别出它属于哪个table
· 编写驱动程序,Map端负责读取数据,sawzall程序,构建sawzall执行环境,输出table内容,Reduce端负责读取table内容,并完成聚合,输出结果
· 具体实现可以参考src/app/szl.cc和src/app/tests/mapreduce_demo_unittest.cc
3.4 伪代码
Mapper
InitializeAllModules();
sawzall::RegisterStandardTableTypes();
sawzall::Executable exe(program_name.c_str(), source.c_str(), sawzall::kNormal);
sawzall::Process process(&exe, false, NULL);
DemoEmitterFactory emitter_factory(result, num_shards);
process.set_emitter_factory(&emitter_factory);
sawzall::RegisterEmitters(&process);
process.Initialize();
for (int i = 0; i < num_input_lines; i++) {
process.Run(input[i].data(), input[i].size(), NULL, 0)
}
// Flush the emitter output to the mapper output shards.
for (int i = 0; i < emitter_factory.emitters().size(); i++) {
SzlEmitter* emitter = emitter_factory.emitters()[i];
emitter->Flusher();
}
Reducer
for (int i = 0; i < reducer_input.size(); i++) {
const string& name_key = reducer_input[i].first;
const vector<string>& values = reducer_input[i].second;
size_t separator_index = name_key.find(kSzlKeyValueSep);
string name = name_key.substr(0,separator_index);
string key = name_key.substr(separator_index+1);
map<string,SzlTabWriter*>::iterator it = tabwriters.find(name);
const SzlTabWriter* tw = it->second;
// Create the tabwriter and tabentry for the key & value.
SzlTabEntry* te = tw->CreateEntry(key);
if (tw->Aggregates()) {
// For aggregating tables, first merge the values.
for (int j = 0; j < values.size(); j++) {
SzlTabEntry::MergeStatus status = te->Merge(values[j]);
if (status == SzlTabEntry::MergeError)
LOG(FATAL) << "error merging results";
}
string value;
te->Flush(&value);
// Write the output to the mill
result.push_back(KeyValuePair(name_key,value));
} else {
// Non-aggregating tables.
if (tw->WritesToMill()) {
// Just write the value directly into the mill.
for (int j = 0; j < values.size(); j++) {
result.push_back(KeyValuePair(name_key,values[j]));
}
} else {
// Direct output table - let the table write the value.
for (int j = 0; j < values.size(); j++) {
te->Write(values[j]);
}
}
}
delete te;
}
4. 相关工作
以目前的眼光来看sawzall,它存在如下一些问题:项目活跃度不够,相关的讨论极少,很久已无更新,目前来看已经是比较老的查询系统了,不知道Google内部是否还在用,但是已知的是在sawzall之后Google已经开发出了一些新的系统完成类似工作,如FlumeJava,Dremel,Tenzing,当然对于一些比较复杂的统计需求来说sawzall还是有其优势的;Sawzall本身更偏向于统计分析,基本上都是只读性的操作;对输入数据格式支持有限,目前内置支持文本或protobuf格式;开源出的版本缺少MapReduce支持,当然这也是Google的MapReduce未开源导致的,所以并没有真正开放出MapReduce环境下的代码,基本上如不做修改只能作为一个单机程序来使用;与手写MapReduce相比,性能会有些下降。当然作为最早的封装于MapReduce之上的脚步式编程语言,这项工作还是具有很大意义的,它大大简化了MapReduce的编写过程。后来的Pig,Hive都参考了它,尤其是Pig更与之类似,都是过程性的,语法都比较符合程序员的使用习惯,与类SQL的HQL则差异较大。
Sawzall实际上已经有比较长的历史了,2003年就已开始应用在Google内部,2006年相关论文发表,2010年开源。值得一提的是其作者之一是Rob Pike,著名的Unix先驱,在贝尔实验室最早和Ken Thompson以及 Dennis M. Ritche 参与Unix开发,UTF-8的设计人,经典书籍The Unix Programming Environment 和 The Practice of Programming 的作者,Google在2009年推出的Go语言就是出自他和Ken Thompson等人之手。
5. 参考文献