2011年10月23日 星期日

從 Android Open Source Project 下載 Android 原始碼

前一陣子因為受到 kernel.org 被駭的牽連,Android Open Source Project (AOSP) 的 git repository 也關閉了一段時間。大約二天之前 AOSP 回來了,不過網址有些許的變動,我就稍微記錄一下下載流程,以免日後忘記要怎麼弄。

先準備必要的工具


要下載 AOSP 的程式碼,你至少需要 curl、git、python 這三個程式。在 Debian 或 Ubuntu 上面你只要使用:

$ sudo apt-get install curl git python

就可以把它們弄到手。接下來我們要先設定 git:

$ git config --global user.name 你的名字
$ git config --global user.email 你的email

下載 repo 版本管理工具


接下來我們必需下載 repo 版本管理工具,我們可以在家目錄之下建立一個資料夾,並把這個資料夾加到 $PATH 裡面:

$ mkdir ~/bin
$ export PATH=~/bin:$PATH
# 備註:你可以把上面這行加到你的 .bashrc,這樣以後要用 repo 的時候就可以直接用。

然後下載 repo 這個工具:

$ curl https://dl-ssl.google.com/dl/googlesource/git-repo/repo > ~/bin/repo
$ chmod +x ~/bin/repo

如果你做到這裡沒有遇到問題就可以開始下載程式碼。如果你遇到 SSL certificate problem 之類的問題可以看下面的常見問題。

開始下載 Android 程式碼


接下來就是這篇文章的重點:下載 Android 程式碼。首先我們先建立一個用來放置 Android 程式碼的資料夾:

$ mkdir ~/android-src
$ cd ~/android-src

初始化 repo 相關的設定:

$ repo init -u https://android.googlesource.com/platform/manifest

接著 repo 會問你一些問題,通常使用預設值就可以了。最後就是按下:

$ repo sync
# 備註:你可以加上 -j2 或 -j4 平行下載。

之後經過漫長的等待,你就會有一份完整的 Android 程式碼

下一步:你可以參考另一篇文章「編譯 Android AOSP」編譯整個 Android 系統。



--------------------------------------------------

常見問題


為什麼會一直停在 Receiving Objects 且網路毫無動靜?


有時候可能因為 TCP/IP 的實作可能遇上一些意外的問題,所以會一直卡在「Receiving Objects」,可以用以下指令調整 TCP 協定的設定:

$ sudo sysctl -w net.ipv4.tcp_window_scaling=0

然後在 repo sync 的時候,只使用單執行緒:

$ repo sync -j1

為什麼會遇到 SSL certificate problem 之類的問題?


如果你遇到 SSL certificate problem 之類的問題,是因為你的系統提供的 curl 沒有內建 cacert 清冊 (也就是 https 憑證發行單位的清冊),curl 沒有辦法驗證 dl-ssl.google.com 的憑證是否正確。

你必需先行下載 cacert.pem。因為 cacert.pem 是防止中間人攻擊的重要機制,所以請務必要確保他的正確性,你可以使用 sha1sum 來檢查:

$ curl  http://curl.haxx.se/ca/cacert.pem --ipv4 > ~/cacert.pem
$ sha1sum ~/cacert.pem
286c4a22fe2ed9e3b2d958e2800a8c16d867f063  cacert.pem

然後再使用以下指令下載 repo 這個工具:

$ curl --cacert ~/cacert.pem https://dl-ssl.google.com/dl/googlesource/git-repo/repo > ~/bin/repo

另外,我們要讓 git 使用這個清冊驗證 Google 的 repository,所以我們要執行以下指令:

$ git config --global http.sslcainfo ~/cacert.pem

參考資料

  1. cURL CA Extract
  2. Android Open Source Project: Download the Source Tree

2011年9月13日 星期二

C++ Iterator 與 Lambda Function

今天寫程式碼的時候遇到一個問題:要怎麼讓一個存有指標的陣列依照指標指向的內容做排序呢?例如該怎麼作才可以讓下面的程式碼依序印出 1~7 呢?

int main() {
  int a[] = { 5, 4, 1, 2, 7, 3, 6 };

  vector<int *> b;
  b.push_back(&a[6]);
  b.push_back(&a[5]);
  b.push_back(&a[2]);
  b.push_back(&a[0]);
  b.push_back(&a[4]);
  b.push_back(&a[1]);
  b.push_back(&a[3]);

  for (size_t i = 0; i < b.size(); ++i) {
    cout << *b[i] << endl;
  }

  return EXIT_SUCCESS;
}

我們一開始可能會想要用 <algorithm> 裡面的 sort,不過很可惜,下面的程式碼一定是錯的:

#include <algorithm>

int main() {
  // ... 略 ...

  sort(b.begin(), b.end());

  for (size_t i = 0; i < b.size(); ++i) {
    cout << *b[i] << endl;
  }

  return EXIT_SUCCESS;
}

這是因為在這種情況下,sort 函式比較的是位址的大小,而不是每一個 Pointer 指向的 int 的值。而且令人遺憾的是我們不能重載 Pointer Type。當然最簡單的解決方法是:
struct int_ptr_less_than_comparator {
public:
  bool operator()(int const *pa, int const *pb) const {
    return *pa < *pb;
  }
};

int main() {
  // ... 略 ... 

  sort(b.begin(), b.end(), int_ptr_less_than_comparator());

  for (size_t i = 0; i < b.size(); ++i) {
    cout << *b[i] << endl;
  }
 
  return EXIT_SUCCESS;
}

我們直接寫一個比較函式 (函式物件),然後把它作為 sort 函式的第三個參數。這樣一來結果是對了,不過感覺可以寫得更為一般化。首先我們可以觀察到 int_ptr_less_than_comparator 是由二個部分組成:(1) 先對 pa 與 pb 取值 (dereference) (2) 比較二者的大小。其中,第二部分和 std::less 的功能是一樣的。而第一部分可以寫一個叫作 dereference_to 的函式物件來解決:

#include <functional>

template <typename T>
struct dereference_to : public std::unary_function<T *, T &> {
public:
  T &operator()(T *ptr) const {
    return *ptr;
  }
};


這個 dereference_to 看起來很複雜,不過不要被他嚇到了!它只是一個帶有 operator() 的 class template。舉例來說,如果我們把 int 代入 T:

struct dereference_to<int>
: public std::unary_function<int *, int &> {
public:
  int &operator()(int *ptr) const {
    return ptr;
  }
}

這樣如果我們忽略 std::unary_function,剩下的應該都不難理解,簡單的說,就是我們可以宣告一個 dereference_to<int> 物件,然後透過其 operator() 運算子幫我們 dereference operator() 的第一個參數。例如:

int main() {
  int c = 5;
  dereference_to<int> dref;
  cout << dref(&c) << endl; // 印出 5
}


所以我們現在有二種函式物件:std::less 與 dereference_to,可是我們還需要一個方法幫我們把 dereference_to 與 std::less 組裝成 int_ptr_less_than_comparator。如果你的編譯器附有非標準的 compose2,你就可以用以下程式碼「組合」出 int_ptr_less_than_comparator:

compose2(less<int>(), dereference_to<int>(), dereference_to<int>())

可是 compose2 畢竟不是標準的一部分,所以我們要自己寫一份:

template <typename F, typename G, typename H>
struct binary_composer
: public binary_function<typename G::argument_type,
                         typename H::argument_type,
                         typename F::result_type>
{
private:
  F f;
  G g;
  H h;

public:
  // 這些 typedef 是要讓 Type Traits 機制可以正常運作。一開始看不懂沒有關係。
  // 只要依樣畫葫蘆即可!
  typedef typename G::argument_type first_argument_type;
  typedef typename H::argument_type second_argument_type;
  typedef typename F::result_type result_type;

public:
  binary_composer(F const &f_, G const &g_, H const &h_)
    : f(f_), g(g_), h(h_) { }


  // 理論上這部分要 overload 四個版本 (不同的 const) 不過為了簡單起見我只寫了一個。
  result_type operator()(first_argument_type const &arg1,
                         second_argument_type const &arg2) const {
    return f(g(arg1), h(arg2));
  }
};

// 因為 class 的 constructor 是要在 class template 具現化之後才能被呼叫,所以我們
// 必須要寫一大串才可以產生 binary_composer 的 object。這很不方便,所以我們可以
// 再寫一個 function template,讓 C++ 的型別推導機制自動推導出來。
template <typename F, typename G, typename H>
binary_composer<F, G, H> compose2(F const &f, G const &g, H const &h) {
  return binary_composer<F, G, H>(f, g, h);
}
 
這樣萬事俱備,只要把它們組合起來就完成了!

int main() {
  // ... 略 ...

  sort(b.begin(), b.end(),
       compose2(less<int>(),
                dereference_to<int>(),
                dereference_to<int>()));

  for (size_t i = 0; i < b.size(); ++i) {
    cout << *b[i] << endl;
  }

  return EXIT_SUCCESS;
}

接下來我們把焦點放到用來印出數字的 for 迴圈,有沒有辦法寫得更為一般化呢?一般我們要印數字的時候,我們可以用 copy 加上 ostream_iterator 達成目標。不過這次 dereference_to 又為我們帶來一些麻煩!這次我們要寫 Output Iterator 的 Decorator:

template <typename I, typename D>
struct output_iterator_decorator
: public iterator<output_iterator_tag, typename D::argument_type> {
private:
  typedef output_iterator_decorator<I, D> THISCLASS;

private:
  I iterator;
  D decorator;

public:
  output_iterator_decorator(I const &iterator_, D const &decorator_)
  : iterator(iterator_), decorator(decorator_) {
  }

  THISCLASS &operator=(typename D::argument_type const &arg) {
    // 先呼叫 decorator 再把回傳值 assign 給 iterator
    *iterator = decorator(arg);
    return *this;
  }

  // dereference 運算子
  THISCLASS &operator*() {
    return *this;
  }

  // 前置遞增運算子
  THISCLASS &operator++() {
    ++iterator;
    return *this;
  }

  // 後置遞增運算子
  THISCLASS &operator++(int) {
    iterator++;
    return *this;
  }
};

// 和前面一樣,利用 C++ 的型別推導具現化 output_iterator_decorator
template <typename I, typename D>
output_iterator_decorator<I, D> decorate(I iter, D decorator) {
  return output_iterator_decorator<I, D>(iter, decorator);
}

有了以上的準備我們就可以把 for 迴圈變成:

  copy(b.begin(), b.end(),
       decorate(ostream_iterator<int>(cout, "\n"),
                dereference_to<int>()));

到此我們再回頭看一下完整的 main 函式:

int main() {
  int a[] = { 5, 4, 1, 2, 7, 3, 6 };

  vector<int *> b;
  b.push_back(&a[6]);
  b.push_back(&a[5]);
  b.push_back(&a[2]);
  b.push_back(&a[0]);
  b.push_back(&a[4]);
  b.push_back(&a[1]);
  b.push_back(&a[3]);

  sort(b.begin(), b.end(),
       compose2(less<int>(), dereference_to<int>(),
                             dereference_to<int>()));

  copy(b.begin(), b.end(),
       decorate(ostream_iterator<int>(cout, "\n"),
                dereference_to<int>()));

  return EXIT_SUCCESS;
}

看起來還是很不直觀,而且為了省幾行程式碼,結果寫了更多難懂的程式碼,好像有一點本末倒置。所以我們再更進一步!如果你可以用 Boost.Lambda 函式庫,你可以寫以下的程式碼:

#include <iostream>

#include <algorithm>
#include <vector>

#include <boost/lambda/lambda.hpp>

using namespace boost::lambda;
using namespace std;

int main() {
  int a[] = { 5, 4, 1, 2, 7, 3, 6 };

  vector<int *> b;
  b.push_back(&a[6]);
  b.push_back(&a[5]);
  b.push_back(&a[2]);
  b.push_back(&a[0]);
  b.push_back(&a[4]);
  b.push_back(&a[1]);
  b.push_back(&a[3]);

  // 排序 (以 int * 指向的值)
  sort(b.begin(), b.end(), ((*_1) < (*_2)));

  // 一個一個印出來
  for_each(b.begin(), b.end(), cout << *_1 << "\n");

  return EXIT_SUCCESS;
}

對!你沒有看錯!這樣就沒有了!這裡的 _1 與 _2 是神秘的 Function Object,Boost.Lambda 函式庫會自動幫你組合成一個合適的 Function Object。(*_1) 就像是我們的 dereference_to,而 ((*_1) < (*_2)) 就像是我們的 int_ptr_less_than_comparator。至於印出數字的部分,我另外做了一些改動。所以不能使用 copy,而是要改成 for_each。

Boost.Lambda 函式庫雖然看起來很漂亮,不過它本身是用很複雜的技巧寫出來的。正常運作的時候很漂亮,不過寫出 bug 的時候,編譯器會噴很多錯誤訊息,一般人根本不可能看得懂。某種意義上來說這已經是函式庫的極限了!

最新的 C++11 加入了 Lambda function (就是下面粗體字的部分),我們再也不需要 Boost.Lambda 也可以達到類似的效果:

#include <iostream>

#include <algorithm>
#include <vector>

using namespace std;

int main() {
  int a[] = { 5, 4, 1, 2, 7, 3, 6 };
 
  vector<int *> b; 
  b.push_back(&a[6]);
  b.push_back(&a[5]);
  b.push_back(&a[2]);
  b.push_back(&a[0]);
  b.push_back(&a[4]);
  b.push_back(&a[1]);
  b.push_back(&a[3]);

  // 排序 (以 int * 指向的值)
  sort(b.begin(), b.end(), [](int *pa, int *pb) { return *pa < *pb; });

  // 一個一個印出來
  for_each(b.begin(), b.end(), [](int *p) { cout << *p << "\n"; });

  return EXIT_SUCCESS;
}

讓我們期待 C++11 的到來吧!

--
備註:範例程式碼 http://www.csie.ntu.edu.tw/~b97073/B/iterator.tgz

2011年8月8日 星期一

處理 Stack Overflow

直接看程式碼比較快:

#define _XOPEN_SOURCE 600

#include <stdio.h>
#include <stdlib.h>
#include <setjmp.h>
#include <signal.h>

sigjmp_buf stack_overflow_guard;

char alter_stack[4096];

void sigsegv_handler(int sig) {
  if (sig == SIGSEGV) {
    /* Jump to catch block */
    siglongjmp(stack_overflow_guard, sig);
  }
}

void install_sigsegv_handler() {
  /* Prepare alternating stack */
  stack_t stk;

  stk.ss_sp = alter_stack;
  stk.ss_size = sizeof(alter_stack);
  stk.ss_flags = 0;

  if (sigaltstack(&stk, NULL) == -1) {
    fprintf(stderr, "ERROR: Unable to setup alternative stack.\n");
    exit(EXIT_FAILURE);
  }

  /* Register SIGSEGV handler */
  struct sigaction act;

  act.sa_handler = &sigsegv_handler;
  sigfillset(&act.sa_mask);
  act.sa_flags |= SA_ONSTACK;

  if (sigaction(SIGSEGV, &act, NULL) == -1) {
    fprintf(stderr, "ERROR: Unable to instal signal handler.\n");
    exit(EXIT_FAILURE);
  }
}

int trigger_stack_overflow(int a, int b, int c, int d, int e) {
  return trigger_stack_overflow(a + 1, b + 1, c + 1, d + 1, e + 1) *
         trigger_stack_overflow(a - 1, b - 1, c - 1, d - 1, e - 1);
}

int main() {
  install_sigsegv_handler();

  if (sigsetjmp(stack_overflow_guard, 0) == 0) {
    printf("normal path, got: %d\n",
           trigger_stack_overflow(0, 0, 0, 0, 0));
  } else {
    printf("stack overflow\n");
  }

  return EXIT_SUCCESS;
}

2011年5月20日 星期五

LaTeX 與 Beamer

最近因為一些需要,必需使用 LaTeX 製作投影片,所以研究了一下 Beamer 的使用方法。其實寫起來並不困難,和一般的 LaTeX article 差不多。Beamer 的 document structure 如下:

\documentclass{beamer}
\usetheme{Singapore} %% 這一個是佈景主題 (選用)
\begin{document}

\title{Title}
\author{Author Name}
\institute{Institute}

\frame{\titlepage} %% 顯示標題頁面

%% ... 其他內容 ...

\end{document}

接下來和 article 一樣,可以分為 section 與 subsection,而每一個 section 和 subsection 可以由若干個 frame 組成。

\section{Section 1}
%% 如果你有需要,可以在這裡加上 \subsection{Subsection 1.1} 
\begin{frame}
  \frametitle{Frame 1.1}
  \begin{itemize}
    \item{Item 1}
    \item{Item 2}
  \end{itemize}
\end{frame}

我們可以看到每個 frame 都會有一個 frametitle,也就是每一頁上方的標題。而下面就是該頁的內容。如果你要條列出各個要點,可以使用 itemize。

最後如果你需要顯示大綱,可以在 titlepage 下面加上:

\section*{Outlines}
\begin{frame}
  \frametitle{Outlines}
  \tableofcontents
\end{frame}

不過因為要建立 tableofcontents 所以要執行 pdflatex 編譯你的投影片二次。這樣你就可以得到一個精美的投影片了。你可以在這裡下載到範例投影片與原始檔

2011年4月30日 星期六

用 OpenMP 實作平行化的 Quicksort

OpenMP 和 Pthread 不同,我們沒有辦法很精確地控制有多少的 Thread 正在執行。所以如果要實作 Quicksort,我們不能直接使用 Recursion。相反地,我們必須引入 Threading Pool 的概念,讓各個 Thread 主動去某個工作清單拿工作。概念程式碼如下:

void quicksort_impl(sort_value_t array[],
                    stack_t *s,
                    size_t *busy_threads) {
  int idle = 1;

  size_t begin = 0;
  size_t end = 0;

  /* We should loop forever until everyone is idle */
  while (1) {
    while (idle) {
      /* This thread is idling.
         Try to get a new task from the stack. */

#pragma omp critical
      if (!stack_empty(s)) {
        /* Get the pending sort range, start work now! */
        sort_range_t *range = stack_top(s);
        begin = range->begin;
        end = range->end;
        stack_pop(s);

        ++*busy_threads;
        idle = 0;
      }

      /* If everyone finished his work, then leave now ... */
      if (*busy_threads == 0) {
        return;
      }
    }

    size_t size = end - begin;

    if (size <= 50) {
      sort_value_t *seg = array + begin;

      /* Use backward insertion sort when the sort range is small */
      size_t i, j;
      for (i = 1; i < size; ++i) {
        sort_value_t cur = seg[i];
        /* Move the number backward if they are greater than cur */
        for (j = i; j > 0 && seg[j - 1] > cur; --j) {
          seg[j] = seg[j - 1];
        }
        seg[j] = cur;
      }

      /* Mark begin as end so that we can acquire next sort range */
      begin = end;

#pragma omp critical
      --*busy_threads;

      idle = 1;

      /* Finished our range continue to acquire next range */
      continue;
    }


    /* Partition our range */
    sort_value_t *seg = array + begin;

    size_t pivot_index = rand() % size;
    SWAP(seg[0], seg[pivot_index]);
    sort_value_t pivot = seg[0];

    size_t left = 1;
    size_t right = size - 1;

    while (left < right) {
      while (seg[left] <= pivot && left < right) { left++; }
      while (seg[right] >= pivot && left < right) { right--; }
      SWAP(seg[left], seg[right]);
    }

    if (seg[left] > pivot) {
      left--;
    }
    SWAP(seg[0], seg[left]);

    /* Push left subrange to stack */
#pragma omp critical
    {
      sort_range_t range;
      range.begin = begin;
      range.end = begin + left;
      stack_push(s, &range);
    }

    /* Continue to sort right subrange */
    begin += left + 1;
  }
}

void quicksort(sort_value_t array[], size_t size) {
  size_t busy_threads = 0;
  stack_t *s = stack_new();
  sort_range_t all = { 0 , size };
  stack_push(s, &all);

  /* Run quicksort_impl in parallel */
#pragma omp parallel
  quicksort_impl(array, s, &busy_threads);

  stack_delete(s);
}

2011年4月13日 星期三

Haskell 與 State Monad

昨天試著用 HaskellState Monad,花了一點時間才寫出一個簡單的猜數字遊戲。感覺要在 Functional Programming Language 寫出 State 的概念比較困難...

import Control.Monad.State
import System.IO
import System.Random

gameLB, gameUB :: Integer
gameLB = 0
gameUB = 100

data GameState = GameState { steps :: Integer ,
                             lb :: Integer,
                             ub :: Integer }

initialState :: Integer -> Integer -> GameState
initialState l u = GameState { steps = 0, lb = l, ub = u }

updateState :: Integer -> Integer -> GameState -> GameState
updateState ans guess (GameState { steps = s, lb = l, ub = u }) =
    case (compare ans guess) of
      EQ -> GameState { steps = s + 1, lb = ans, ub = ans }
      LT -> GameState { steps = s + 1, lb = l, ub = min (guess - 1) u }
      GT -> GameState { steps = s + 1, lb = max l (guess + 1), ub = u }

range :: GameState -> (Integer, Integer)
range (GameState { lb = l, ub = u }) = (l , u)

stepCount :: GameState -> Integer
stepCount (GameState { steps = s }) = s

gameSession :: Integer -> StateT GameState IO Integer
gameSession ans =
    do (l, u) <- gets range
       lift $ putStr $ "Guess number (" ++ show l ++ ".." ++ show u ++ "): "
       lift $ hFlush stdout
       str <- lift $ getLine
       let num = read str
       modify (updateState ans num)
       case (compare ans num) of
         EQ -> gets stepCount >>= return
         LT -> gameSession ans >>= return
         GT -> gameSession ans >>= return

main :: IO ()
main = do rand <- getStdRandom (randomR (gameLB, gameUB))
          s <- evalStateT (gameSession rand) (initialState gameLB gameUB)
          putStrLn $ "Steps used: " ++ show s

備註:上面的程式用 ghc --make Guess.hs 就可以編譯了(有用到 mtl package

2011年3月12日 星期六

libffi 的範例

在 C/C++ 當中,函式的型別必需在編譯期固定下來才可以正確的呼叫。所以 Function Pointer 都要記錄 Function 的 Signature,簡單地說下面二個 Function Pointer 是不同的型別:

void (*func0)();
void (*func1)(int);

然而在很多情況下,我們只能在執行期決定我們要呼叫的函數。最常見的例子就是有提供 Native Function Call 的 Interpreter。例如:Python (C API)、Java (JNI) 等等。然而這種功能如果只能使用 C 語言實作,會變得非常龐雜、沒效率而且失去一般性。所以有了 libffi 的產生。

libffi 的全名是 Foreign Function Interface,他本身是用組合語言撰寫,不過他提供了一個 Portable 的 API。我們只要呼叫這些 API 就可以進行動態的函式呼叫(這和 Function Pointer 不同。如前所述,Function Pointer 必需在 Compiler Type 決定函式的型別。而 libffi 可以在執行期決定 Callee 的型別)。許多有名的計劃都是 libffi 的使用者,例如:CPythonOpenJDKDalvik VM 等等。

以下是簡單的範例程式:

#include <stdio.h>
#include <stdlib.h>

/* libffi 的 header */
#include <ffi.h>

/* 一些亂七八糟的函式 */
int func0() {
  printf("%s()\n", __func__);
  return 0;
}

int func1(int p0) {
  printf("%s(%d)\n", __func__, p0);
  return p0;
}

int func2(int p0, int p1) {
  printf("%s(%d, %d)\n", __func__, p0, p1);
  return p0 + p1;
}

/* 函式查詢表。也可以使用 hash table、binary tree 來實作,
   不過作為一個簡單的範例直接使用 array 就好了。 */
void *func_table[] = {
  (void *)func0, (void *)func1, (void *)func2,
};

int main(int argc, char **argv) {
  int i;

  /* 決定我們要使用哪一個函式 */
  if (argc < 2) {
    fprintf(stderr, "USAGE: %s parameter-count\n", argv[0]);
    exit(EXIT_FAILURE);
  }

  int parameter_count = atoi(argv[1]);

  if (parameter_count > 2) {
    fprintf(stderr, "WARNING: Parameter count is greater than 2.\n");
    parameter_count = 2;
  }

  /* 讀取函式的參數 */
  int values[2] = { 0 , 0 };

  for (i = 0; i < parameter_count; ++i) {
    if (scanf("%d", &values[i]) != 1) {
      static char const *const postfix[] = {
        "th", "st", "nd", "rd", "th",
        "th", "th", "th", "th", "th"
      };

      fprintf(stderr, "Unable to read %d%s parameter\n",
              i + 1, postfix[(i + 1) % 10]);
      exit(EXIT_FAILURE);
    }
  }

  /* 使用 libffi 呼叫指定的函式 */
  ffi_cif cif;
  ffi_type *args[] = { &ffi_type_sint, &ffi_type_sint };
  void *ptr_values[] = { &values[0], &values[1] };

  /* 準備 ffi 所需的資料結構 */
  if (ffi_prep_cif(&cif, FFI_DEFAULT_ABI, parameter_count,
                   &ffi_type_sint, args) == FFI_OK) {
    int sum = 0;

    /* 執行函式呼叫 */
    ffi_call(&cif, func_table[parameter_count], &sum, ptr_values);

    printf("return value: %d\n", sum);
  } else {
    fprintf(stderr, "Unable to initialize FFI structure.\n");
    exit(EXIT_FAILURE);
  }

  return EXIT_SUCCESS;
}

2011年2月12日 星期六

幫 C 語言加上 Garbage Collector

前幾天讀 Dragon Book 的時候突發奇想,何不自己實作 Conservative Garbage Collector?

一般來說 Tracing Garbage Collector 分成以下三個步驟:
  1. 先從 call stack 或者 global variables 的 references (或者 pointers) 蒐集 object 的位址作為 root set,並標記 root set 當中的 object。
  2. 檢視 root set 當中的 object。如果 object 也包含 reference,而且這個 reference 指向的 object' 沒有被標記過,就標記並檢視 object'。(不斷遞迴直到沒有新的 object 被標記)
  3. 清除並回收沒有被標記的 object。
然而為了有效率地計算 root set 與走訪被標記的 objects,通常我們會需要編譯器加入一些額外的資訊。而且程式語言必需是 type-safe 的!著名的例子如 Java 或者是 Haskell、Common Lisp 等等。而 C/C++ 就不適合當 Garbage Collector 的目標語言,因為 C/C++ 支援 Pointer Arithmetic、Union Type 等語言構件,使得我們幾乎不可能在 C/C++ 之上實作 (Accurate) Garbage Collector。

不過 Conservative Garbage Collector 的做法不太一樣。Conservative Garbage Collector 的想法是:我不去分辨誰是 Pointer Type 誰是 Integer Type,只要「看起來」像是 Pointer 的 bit pattern,我就把他當成 Pointer。這有一個好處:我們不需要編譯器的幫忙也可以寫 Garbage Collector,所以可以和其他的程式碼混用;然而也有一個壞處:有時候 Conservative Garbage Collector 過於保守,以至於無法回收一些可以回收的記憶體。以下我就以一個小程式展現 Conservative Garbage Collector 的基本概念。

首先我們的第一個問題是如何找到 root set?我們可以取 main 函式 argc 的位址,另外在 gc_cleanup_ 函式隨意傳入一個整數,再取該整數的位址。所有還可能會用到的 pointer 就會在這二個整數之間:

void *stack_top;

void gc_init(int *argc_ptr) {
  stack_top = argc_ptr;
}

void gc_cleanup_(int stack_indicator) {
  void *stack_bottom = &stack_indicator; 
  /* Stack 的有效範圍就介於 [stack_bottom, stack_top] 之間 */
}

int main(int argc, char **argv) {
  gc_init(&argc);
}

當然我們還必需考慮 alignment 的問題,所以要把 stack_top 與 stack_bottom 對齊到 sizeof(void *) 的某倍。這可以用以下二個巨集來達成:

#define FLOOR_ALIGN_TO_WORD(X) \
    (((uintptr_t)(X)) / sizeof(void *) * sizeof(void *))

#define CEIL_ALIGN_TO_WORD(X) \
    (((uintptr_t)(X) + sizeof(void *) - 1) / \
        sizeof(void *) * sizeof(void *))

接著我們要準備一些空間來當我們的 Heap,所以我們在 gc_init 之中加入一些程式碼:

static char *heap_begin = NULL;
static char *heap_free = NULL;
static char *heap_end = NULL;

/* Initialize the internal data structure of the
   garbage collector */
void gc_init(int *argc_ptr) {
    /* We assume that argc is the variable in the
       stack having the highest address. */
    stack_top = (void **)CEIL_ALIGN_TO_WORD(argc_ptr);

    /* Allocate the heap using mmap. */
    heap_begin = (char *)mmap(0, GC_HEAP_SIZE,
                              PROT_READ | PROT_WRITE,
                              MAP_PRIVATE | MAP_ANONYMOUS,
                              -1, 0);

    if (!heap_begin || heap_begin == MAP_FAILED) {
        fprintf(stderr,
                "Unable to allocate the heap (size=%lu).\n",
                (unsigned long)GC_HEAP_SIZE);
        exit(EXIT_FAILURE);
    }

    heap_free = heap_begin;
    heap_end = heap_begin + GC_HEAP_SIZE;
}

接著我們需要一些資料結構用以管理我們分配出去的記憶體。做為一個簡單的範例,我只用了一個陣列依序記錄分配出去的記憶體,我的陣列會從 heap_end 開始往 heap_begin 的方向生長。

當然這是一個很沒有效率的作法,比較有效率的方法是使用類似 buddy system 之類的資料結構來管理 heap。

enum {
    ALLOC_STATUS_UNKNOWN,
    ALLOC_STATUS_TOUCHED,
    ALLOC_STATUS_REFERRED,
    ALLOC_STATUS_DEALLOCATED,
};

typedef struct alloc_record_ {
    char *addr;
    size_t size;
    unsigned int status;
} alloc_record;

為了操作 alloc_record 資料結構,我寫了三個函式,其功能分述如下(程式碼就不再贅述):

static alloc_record *find_alloc_record(char *addr) {
  /* 給定一個位址,找出它是屬於哪一次 allocation */
}

static void insert_alloc_record(char *addr, size_t size) {
  /* 在 alloc_record 的尾端加上一筆記錄 */
}

static void *allocate_deallocated_block(size_t size) {
  /* 重新把 deallocated 過的位址配置給不同的人 */
}

接下來是 Malloc 的程式碼。我們先嘗試直接從 heap_free 配置記憶體,如果記憶體不夠,再嘗試使用別人 deallocate 的記憶體。如果以上二者都沒有用,我們就必需進行 Garbage Collection。(備註:gc_cleanup(); 即 gc_cleanup_(0);)

/* Allocate a block with given bytes. */
void *gc_malloc(size_t size) {
    /* Make sure that all of returned address is aligned */
    size = CEIL_ALIGN_TO_WORD(size);

    /* Try to allocate if free space available */
    if (is_free_space_avail(size)) {
        char *result = heap_free;
        heap_free += size;
        insert_alloc_record(result, size);
        return result;
    }

    /* Try to allocate from deallocated address. */
    void *result = allocate_deallocated_block(size);
    if (result) {
        return result;
    }

    /* Try to collect garbage and retry. */
    while (gc_cleanup()) {
        void *result = allocate_deallocated_block(size);
        if (result) {
            return result;
        }
    }

    return NULL;
}

當然有 Malloc 就要有 Free,不過值得注意的是:為了提高 Conservative Garbage Collector 的回收率,減少「應該 deallocate 而沒有 deallocate」的情況,我們應該要抹除整個 object 與傳入的 Pointer

/* Deallocate and wipe the allocated block. */
static void deallocate(alloc_record *record) {
    /* Mark this block as deallocated */
    record->status = ALLOC_STATUS_DEALLOCATED;

    /* Wipe the memory block for cleaning the possible
       pointer address */
    memset(record->addr, '\0', record->size);
}

/* Deallocate the given address and wipe the pointer. */
void gc_free_(void **addr_ptr) {
    alloc_record *record = find_alloc_record(*addr_ptr);
    if (!record) {
        fprintf(stderr, "Memory corrupted\n");
        return;
    }

    deallocate(record);
    *addr_ptr = NULL;
}

最後就是最關鍵的 Garbage Collection 的程式碼 gc_cleanup

/* Garbage collection and deallocate unused memory. */
size_t gc_cleanup_(int stack_indicator) {
    alloc_record *records =
        (alloc_record *)heap_end - alloc_record_count;

    size_t i, j;

    /* Mark alloc record as UNKNOWN */
    for (i = 0; i < alloc_record_count; ++i) {
        if (records[i].status != ALLOC_STATUS_DEALLOCATED) {
            records[i].status = ALLOC_STATUS_UNKNOWN;
        }
    }

    /* Scan the stack */
    void **stack_bottom =
        (void **)CEIL_ALIGN_TO_WORD(&stack_indicator);
    assert(stack_top != NULL);
    assert(stack_bottom <= stack_top);
    scan_and_touch(stack_bottom, (void **)stack_top);

    /* Scan the heap */
    scan_touched_objects();

    /* Deallocate and reset the status */
    static unsigned int gc_count = 0;
    static char const sep[] =
    "-------------------------------------------------------";

    fprintf(stderr, "%s\n", sep);
    fprintf(stderr,
            "GARBAGE COLLECTION ROUND #%u\n", ++gc_count);

    size_t deallocated_count = 0;
    for (j = alloc_record_count, i = j - 1; j > 0; --i, --j) {
        if (records[i].status == ALLOC_STATUS_UNKNOWN) {
            fprintf(stderr,
                    "  Reclaim [addr: %p, size: %lu]\n",
                    records[i].addr,
                    (unsigned long)records[i].size);

            deallocate(&records[i]);
            ++deallocated_count;
        }
    }

    fprintf(stderr, "%s\n\n", sep);

    return deallocated_count;
}

/* Scan for address between [begin, end) */
static void scan_and_touch(void *begin_, void *end_) {
    void **begin = (void **)FLOOR_ALIGN_TO_WORD(begin_);
    void **end = (void **)FLOOR_ALIGN_TO_WORD(end_);

    for (; begin < end; ++begin) {
        char *addr = (char *)*begin;

        if (addr >= heap_begin && addr < heap_free) {
            alloc_record *record = find_alloc_record(addr);

            if (record && record->status ==
                    ALLOC_STATUS_UNKNOWN) {
                record->status = ALLOC_STATUS_TOUCHED;
            }
        }
    }
}

/* Scan the touched objects */
static void scan_touched_objects() {
    alloc_record *records =
        (alloc_record *)heap_end - alloc_record_count;

    size_t count, i;

    do {
        count = 0;
        for (i = 0; i < alloc_record_count; ++i) {
            if (records[i].status != ALLOC_STATUS_TOUCHED) {
                /* Not in touched state, skip it. */
                continue;
            }

            /* Mark as referred. */
            records[i].status = ALLOC_STATUS_REFERRED;

            /* Scan this object */
            scan_and_touch(
                (void **)(records[i].addr),
                (void **)(records[i].addr + records[i].size));

            count++;
        }
    } while (count > 0);
}

以上程式碼就是一個具體而微的 Conservative Garbage Collector,完整的程式碼與測試程式可以從這裡下載:conservativegc.h , conservativegc.c , test_tree.c , test_many.c 。當然這個 Conservative Garbage Collector 還少了很多東西:包括計算 static storage 的 root set(在 Linux 之下可以檢查 etext 與 end 二者之間的值),還有更有效率的 allocation policy 等等。

備註:如果你真的有在 C/C++ 使用 Garbage Collector 的需求可以參考 libgc,一個由 BoehmDemersWeiser 等前輩撰寫 (他們是提出 Conservative Garbage Collector 的重要前輩),並被移植到多個平台的 Conservative Garbage Collector 函式庫。

2011年1月25日 星期二

Rvalue Reference 與 String 的實作

Rvalue ReferenceC++0x 當中,一個非常重要的新功能。有了 Rvalue Reference,我們就可以容易地寫出具有 Moving Semantics 的 class。如果善用它,我們可以寫出更有效率的 C++ 程式碼。目前比較有名的編譯器如 GCC 或者 Visual C++ 都已經有 Rvalue Reference。

說來說去,Rvalue Reference 究竟是什麼?

首先我們必須要介紹 Lvalue 與 Rvalue。C++ 程式語言之中,Rvalue 與 Lvalue 相反的概念,Rvalue 的定義是「不是 Lvalue 的 Object(或曰 Variable),就是 Rvalue」。那 Lvalue 又是什麼?所謂的 Lvalue 是指在記憶體上有固定位置可以取址,可以放在指派運算子左邊的值。例如:

int a, b, c;
a = 1; b = 2; c = 3;  // (1)
a = b + c;  // (2)

第一行當中 a、b、c 三個變數都是放在指派運算子的左邊,所以都是 Lvalue。而第二行當中,a 是 Lvalue,(b+c) 這個表達式是 Rvalue。下面的程式很明顯是不合法的:

b+c = 0; // X

因為 b+c 這個 Rvalue 在記憶體上沒有固定的位址(翻譯為機器碼後,b+c 通常會被儲存在暫存器),所以自然也不能把 0 指派給他。但是這和 Rvalue Reference 有什麼關係呢?我們看看這個例子:

string a("abc");
string b("def");
string c("ghi");

string all = a + b + c + a;  // (3)

第三行的 (a+b) 、((a+b)+c)、(((a+b)+c)+a) 都是 Rvalue。在 C++98 當中,我們只能把 Rvalue 綁定到 const 修飾過的 Reference,所以我們的 operator+ 通常會這樣寫:

string operator+(string const &lhs, string const &rhs)  {
  string result(lhs); // (4) copy lhs
  result += rhs; // (5) concat rhs
  return result;
}

每當我們呼叫 operator+,我們就要複製一份 lhs,然後再把 rhs 串接上去。我們仔細地看一下上述程式的執行過程:
  1. 首先執行 a+b 的時候,我們要先複製一份 a(稱為 result1),再把 b 串接到 result1 的後面,最後回傳 result1。
  2. 接著我們要計算 result1+c 的結果。我們必需先複製一份 result1(稱之為 result2),再把 c 串接到 result2 的後面,最後回傳 result2。
  3. 最後我們要計算 result2+a 的結果,我們必需要先複製一份 result2(稱之為 result3),再把 a 串接到 result3 的後面,最後回傳 result3。
這個程式非常沒有效率。它會複製一份 result1,一份 result2,可是複製完就忘掉 result1 與 result2!為了減少這樣的浪費,有人提出了 Move Semantics:

struct string_tmp {
  string *ptr;
  string_tmp(string *s) : ptr(s) { }
  ~string_tmp() { delete ptr; }
};

// Move Constructor
string::string(string_tmp rhs) {
  internal_move(rhs); 
}

// operator=
string &string::operator=(string_tmp rhs) {
  internal_move(rhs);
  return *this;
}

void string::internal_move(string_tmp with) {
  if (buf) {
    delete [] buf;
  }

  buf = with.ptr->buf;
  buf_size = with.ptr->buf_size;
  str_length = with.ptr->str_length;

  with.ptr->buf = NULL;
  with.ptr->buf_size = 0;
  with.ptr->str_length = 0; 
}

string_tmp operator+(string const &lhs, string const &rhs) {
  string_tmp result(new string(lhs));
  *(result.ptr) += rhs;
  return result;
}

string_tmp operator+(string_tmp lhs, string const &rhs) {
  *(lhs.ptr) += rhs;
  return lhs;
}

上面的程式碼最重要的概念:operator+ 回傳的型別變成 string_tmp。然後多載 operator+ 與 operator=。這二個 operator 如果看到 string_tmp,就會想辦法把裡面的東西偷出來用,從而避免無謂的複製!

雖然在 C++98 當中,我們可以模仿出 Move Semantics,不過寫起來即為痛苦,不但需要黑魔法(上面的程式碼並不完整,詳情請參閱 moving_string.hpp),而且難以維護,如果 C++ 可以讓我們把 Rvalue 偷出來用就好了!

這就是 Rvalue Reference 可以讓我們做的事!

Rvalue Reference 在 C++ 的 notation 如下:

T &&rref = ... ;

我們可以把 Rvalue 綁定到 Rvalue Reference 上,例如:

string &&rref = a + b;

但是我們沒有辦法直接把 Lvalue 綁定到 Rvalue Reference 上,下面這個 statement 是不合法的:

string a;
string &&rref = a; // X

這是為了防止我們錯誤地把 Lvalue 當成 Rvalue。還記得嗎?Lvalue 是在記憶體上有特定位置,程序員碰得到 Lvalue,因此我們不能偷  Lvalue 的東西。如果我想告訴編譯器:「沒關係,這個變數我不在乎你去偷東西,你把它當成 Rvalue 就可以了!」可以使用 std::move:

string &&rref = std::move(a);

說了這麼多,到底要怎麼用 Rvalue Reference?和前面的 string_tmp 一樣,我們必需要多載 operator+、operator=、move constructor:

// Move Constructor
string::string(string &&rhs)
  : buf(NULL), buf_size(0), str_length(0) {

  internal_move(rhs);
}

// operator=
string &string::operator=(string &&rhs) {
  if (this == &rhs) {
    return *this;
  }
  internal_move(rhs);
  return *this;
}

void string::internal_move(string &with) throw () {
  if (buf) {
    delete [] buf;
  }

  buf = with.buf;
  buf_size = with.buf_size;
  str_length = with.str_length;

  with.buf = nullptr;
  with.buf_size = 0;
  with.str_length = 0;
}

// operator+
string &&operator+(string &&lhs, string const &rhs) {
  lhs += rhs;
  return std::move(lhs); // convert lhs to r-value again
}

我們可以注意到:我們不再需要 string_tmp,編譯器可以幫我們分辨何者是 Rvalue,並告訴我們可不可以從該物件偷取資源!

結語:有了 Rvalue Reference,我們可以寫出更有效率的程式碼。到了 C++0x 的時代,一個好的 C++ 程序員應該要能掌握 Rvalue Reference 的威力!

完整的程式碼:test.cppsimple_string.hpp(一般的字串)、moving_string.hpp(有 move semantics 的字串)、rrefopt_string.hpp(使用 rvalue reference 的字串)。