<< Ken Wu's Blog » HBase二级索引与Join | 首页 | Ken Wu's Blog » HBase性能调优 >>

hbase ( key 设计 ) 条件查询排序分页

Paging is a very common use-case for web sites and many other applications.  In relational databases, this is easily implemented with LIMIT and OFFSET, or by selecting the row number in the query and adding conditionals based on it’s value.  HBase 0.19.x, on the other hand, does not provide any queries or filters that support paging directly. After a quick example using SQL, I will show how to implement the same functionality in HBase.

Let’s assume that we have a large number of users.  Each user has performed a number of actions.  Each action has a unique identifier, a timestamp, and a name.

This is how you might get the third page of an individual users’ actions using SQL:

SELECT id, name, stamp FROM actions WHERE userid = 1
ORDER BY stamp DESC LIMIT 10 OFFSET 20;

This utilizes secondary indexes on both userid and stamp, meaning to accomplish this query you need at least three indexes on this table as id is the primary key.  Though a simple query to write, you will run into problems as the actions table grows to millions of rows and beyond.  Insertions would look like:

INSERT INTO actions (id, userid, name, stamp) VALUES (newid(), 1, 'Joe User', epoch());

HBase has no real indexes.  Rows are stored in sorted order, and columns in a family are sorted.  For more information, read the HBase Architecture page on the HBase Wiki.

Very conscious of the primary queries we will run on user-actions, we will design an HBase table to support paging queries on per-user, time-ordered lists of actions.

We will use the Java Client API for HBase, specifically the HTable class.  What we are looking for are two methods:

public static List<Action> getUserActions(int userid, int offset, int limit)
public static void putUserAction(Action action)

Please note, I am using a custom object, Action, for simplicity.  It is a client-side holder for the four action fields (id, userid, name, stamp).

There are a number of ways to store your data in HBase that will allow the getUserActions query, but in this case we will go with a very tall table design (lots of rows with few columns in them) rather thanwide (lots of columns in each row).  Specifically, the difference here would be whether you have a row-per-action or a row-per-user.  We will do a row-per-action, but will be designing our row key (the primary key) to be a composite key to allow for grouping and sorting of actions, rather than just the action id.  This means we will not have random-access to an action by it’s id, so rather than defining this as the actions table (which might also exist if you needed actionid random access) we will define it as the useractions table, and we will only store a single column in a single family,content:name.

The row key that we will use in our HBase useractions table is:

<userid><reverse_order_stamp><actionid>

It’s very important that each of these fields is fixed-length and binary so that the lexicographical/ascending byte-ordering of HBase will properly sort our rows.

The userid field will be a 4 byte, big endian integer.  reverse_order_stamp is an 8 byte, big endian long with a value of (Long.MAX_VALUE - epoch).  This is so the most recent stamp is at the top rather than the bottom.  actionid is another 4 byte, big endian integer.  Thankfully, HBase provides helpful utilties in the org.apache.hadoop.hbase.util.Bytes class to deal with this (unfortunately it lacked some key features in 0.19, so the code below makes use of the Bytes class available in 0.20/TRUNK).  Before we get into HBase code, let’s define the helper methods makeActionRow and readActionRow to deal with the composite key:

public static byte [] makeActionRow(int userid, long stamp, int actionid)
throws Exception {
  byte [] useridBytes = Bytes.toBytes(userid);
  byte [] stampBytes = Bytes.toBytes(stamp);
  byte [] actionidBytes = Bytes.toBytes(actionid);
  return Bytes.add(useridBytes, stampBytes, actionidBytes);
}
 
public static Action readActionRow(byte [] row)
throws Exception {
  // Bytes.toInt(byte [] buf, int offset, int length)
  int userid = Bytes.toInt(row,0,4);
  long stamp = Long.MAX_VALUE - Bytes.toLong(row,4,8);
  int actionid = Bytes.toInt(row,12,4);
  return new Action(userid,stamp,actionid);
}

Now that we can deal with the composite keys, insertion is very straightforward:

public static void putUserAction(Action action) throws Exception {
  // Get the fields from the Action object
  int userid = action.getUserID();
  long stamp = Long.MAX_VALUE - action.getStamp();
  int actionid = action.getID();
  String name = action.getName();
 
  // Build the composite row, column, and value
  byte [] row = makeActionRow(userid,stamp,actionid);
  byte [] column = Bytes.toBytes("content:name");
  byte [] value = Bytes.toBytes(name);
 
  // Insert to HBase
  HTable ht = new HTable("useractions");
  BatchUpdate bu = new BatchUpdate(row);
  bu.put(column,value)
  ht.commit(bu);
}

We just serialize the fields into the composite row, and write the single column to HBase in a BatchUpdate.  Reading will deal with unserializing the fields and Scanners.  In addition to matching for the content:name column, we will also specify a startRow and stopRow so that the Scanner only returns results from the user we are looking at.  This way we do not have to worry about jumping to the next user in our code, the Scanner will just stop.

public static List<Action> getUserActions(int userid, int offset, int limit)
throws Exception {
  // Initialize counter and List to return
  int count = 0;
  List<Action> actions = new ArrayList<Action>(limit);
 
  // Initialize startRow, stopRow, and columns to match
  byte [] startRow = makeActionRow(userid,0,0);
  byte [] stopRow = makeActionRow(userid,Long.MAX_VALUE,Integer.MAX_VALUE);
  byte [][] columns = {Bytes.toBytes("content:name")};
 
  // Open Scanner
  HTable ht = new HTable("useractions");
  Scanner s = ht.getScanner(columns,startRow,stopRow);
  RowResult res = null;
 
  // Iterate over Scanner
  while((res = s.next()) != null) {
    // Check if past offset
    if(++count <= offset) continue;
 
    // Get data from RowResult
    byte [] row = res.getRow();
    byte [] value = res.get(columns[0]).getValue();
 
    // Build Action
    Action action = readActionRow(row);
    String name = Bytes.toString(value);
    action.setName(name);
    actions.add(action);
 
    // Check limit
    if(count == offset + limit) break;
  }
  // Cleanup and return
  s.close();
  return actions;
}

The storage of your data must be tied to how you need to query it.  Without a sophisticated query engine or indexing capabilities, you must design to take advantage of sorted rows and columns, potentially designing a table per query type.  Denormalization is okay!

阅读全文……

标签 : ,



发表评论 发送引用通报