很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日活跃等分析。
本文给出了一个使用MaxCompute MapReduce开发一个对不同日期活跃用户ID进行bitmap编码和计算的样例。供感兴趣的用户进一步了解、分析,并应用在自己的场景下。
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Iterator;public class bitmapDemo2
{public static class BitMapper extends MapperBase {Record key;Record value;@Overridepublic void setup(TaskContext context) throws IOException {key = context.createMapOutputKeyRecord();value = context.createMapOutputValueRecord();}@Overridepublic void map(long recordNum, Record record, TaskContext context)throws IOException{RoaringBitmap mrb=new RoaringBitmap();long AID=0;{{{{AID=record.getBigint("id");mrb.add((int) AID);//获取keykey.set(new Object[] {record.getString("active_date")});}}}}ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());mrb.serialize(new DataOutputStream(new OutputStream(){ByteBuffer mBB;OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}public void close() {}public void flush() {}public void write(int b) {mBB.put((byte) b);}public void write(byte[] b) {mBB.put(b);}public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}}.init(outbb)));String serializedstring = Base64.getEncoder().encodeToString(outbb.array());value.set(new Object[] {serializedstring});context.write(key, value);}}public static class BitReducer extends ReducerBase {private Record result = null;public void setup(TaskContext context) throws IOException {result = context.createOutputRecord();}public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {long fcount = 0;RoaringBitmap rbm=new RoaringBitmap();while (values.hasNext()){Record val = values.next();ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);RoaringBitmap p= new RoaringBitmap(irb);rbm.or(p);}ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());rbm.serialize(new DataOutputStream(new OutputStream(){ByteBuffer mBB;OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}public void close() {}public void flush() {}public void write(int b) {mBB.put((byte) b);}public void write(byte[] b) {mBB.put(b);}public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}}.init(outbb)));String serializedstring = Base64.getEncoder().encodeToString(outbb.array());result.set(0, key.get(0));result.set(1, serializedstring);context.write(result);}}public static void main( String[] args ) throws OdpsException{System.out.println("begin.........");JobConf job = new JobConf();job.setMapperClass(BitMapper.class);job.setReducerClass(BitReducer.class);job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
// +------------+-------------+
// | id | active_date |
// +------------+-------------+
// | 1 | 20190729 |
// | 2 | 20190729 |
// | 3 | 20190730 |
// | 4 | 20190801 |
// | 5 | 20190801 |
// +------------+-------------+OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
// +-------------+------------+
// | active_date | bit_map |
// +-------------+------------+
// 20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
// 20190730,OjAAAAEAAAAAAAAAEAAAAAMA
// 20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3DJobClient.runJob(job);}
}
对Java应用打包后,上传到MaxCompute项目中,即可在MaxCompute中调用该MR作业,对输入表的数据按日期作为key进行用户id的编码,同时按照相同日期对bitmap后的用户id取OR操作(根据需要可以取AND,例如存留场景),并将处理后的数据写入目标结构表当中供后续处理使用。
原文链接
本文为云栖社区原创内容,未经允许不得转载。