Google Fusion Tablesの書き込みについて at ミネルヴァの梟は黄昏とともに飛び始める

ミネルヴァの梟は黄昏とともに飛び始める

Follow me on Twitter: http://www.twitter.com/dddaisuke

Google Fusion Tablesの書き込みについて

with one comment

昨日は、Google and Bing Maps Hackathon in Japanに主催者として参加してきました。参加されたみなさまお疲れさまでした。

私はMaps系のテクノロジーに詳しくないのですが、今回はじめて、現在のGoogle Mapsで扱う大量のデータを格納しておけるGoogle Fusion Tablesというものを知りました。

Google Fusion TablesにCSVデータをアップロードすると、サーバー内部で取り込み処理が実行されますが結構遅いという印象があります。郵便局が公開している郵便番号の全国データは約12万レコードあるのですが、これを処理するのに10分ぐらいかかりました。

ファイルのアップロード自体はすぐに終わるのですが、そこからの処理で結構時間がかかります。もちろん、データ自体は処理が完了したものがすぐに閲覧できますし、進捗が何%かという情報もリアルタイムで確認する事ができます。(個人的な感覚からすると、もう少し書き込み処理が速くてもいいんじゃないかと思いました)

私は、ウェブからアップロードされたデータ加工についてはウェイト処理が入ってるのかもしれないと考え、Google Fusion Tables API経由で直接インサートするプログラムを書いてマルチスレッドで一気に書き込みをしてみましたが、500エラー(Internal Server Error)が大量に返却され使えませんでした。一応、リトライ処理も入れてあるのですが、プログラムの実行開始直後からエラーの量がどんどん増えていきました。

結果としては、残念でしたがAPIの使い方とか知りたい人もいるでしょうからコードを置いておきます。必要なライブラリはimport文から推測して探して下さい。


※使い捨てのコードなのでほとんどのエラー処理をしていないのと、スレッド数の監視をしていません。

package com.daisukeyamashita.gmaps.fusion.tools;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Scanner;
import java.util.regex.MatchResult;
import java.util.regex.Pattern;

import jp.sourceforge.csvparser.BasicCSVUtility;
import jp.sourceforge.csvparser.CSVReader;
import jp.sourceforge.csvparser.CSVUtility;

import com.google.gdata.client.ClientLoginAccountType;
import com.google.gdata.client.GoogleService;
import com.google.gdata.client.Service.GDataRequest;
import com.google.gdata.client.Service.GDataRequest.RequestType;
import com.google.gdata.util.ContentType;
import com.google.gdata.util.ServiceException;

public class Uploader {

    private static final String GOOGLE_ACCOUNT = "example@gmail.com";
    private static final String PASSWORD = "password";

    /**
     * Google Fusion Tables API URL stem.
     * All requests to the Google Fusion Tables server
     * begin with this URL.
     *
     * The next line is Google Fusion Tables API-specific code:
     */
    private static final String SERVICE_URL = "https://www.google.com/fusiontables/api/query";

    /**
     * CSV values are terminated by comma or end-of-line and consist either of
     * plain text without commas or quotes, or a quoted expression, where inner
     * quotes are escaped by doubling.
     */
    private static final Pattern CSV_VALUE_PATTERN = Pattern
            .compile("([^,\\r\\n\"]*|\"(([^\"]*\"\")*[^\"]*)\")(,|\\r?\\n)");

    /**
     * Handle to the authenticated Google Fusion Tables service.
     *
     * This code uses the GoogleService class from the
     * Google GData APIs Client Library.
     */
    private GoogleService service;

    public Uploader(String filename, String tablename) {
        CSVReader reader = null;
        try {
            System.out.println(filename);
            service = new GoogleService("fusiontables", "fusiontables.ApiExample");
            service.setUserCredentials(GOOGLE_ACCOUNT, PASSWORD, ClientLoginAccountType.GOOGLE);

            CSVUtility utility = new BasicCSVUtility();
            reader = utility.createCSVReader(new BufferedReader(new FileReader(filename)));

            String[] columnNames = null;
            columnNames = reader.readRecord();
            if (columnNames == null) {
                System.err.println(filename + " is blank file."); //$NON-NLS-1$
                return;
            }
            String tableId = runUpdate(getCreateTableSql(tablename, columnNames));

            String[] columns = null;
            while ((columns = reader.readRecord()) != null) {
                asyncUpdate(getInsertTableSql(tableId, columnNames, columns));
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ServiceException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            try {
                if (reader != null) {
                    reader.close();
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    ArrayList<String> list = new ArrayList<String>();

    private void asyncUpdate(String sql) {
        synchronized (list) {
            list.add(sql);
            if (list.size() == 1000) {
                new Thread(new Runnable() {

                    @Override
                    public void run() {
                        ArrayList<String> sqls = null;
                        synchronized (list) {
                            sqls = (ArrayList<String>) list.clone();
                            list.clear();
                        }
                        boolean isRetry = false;
                        for (String sql : sqls) {
                            do {
                                isRetry = false;
                                try {
                                    runUpdate(sql);
                                } catch (IOException e) {
                                    // TODO Auto-generated catch block
                                    e.printStackTrace();
                                } catch (ServiceException e) {
                                    if (e.getMessage()
                                            .startsWith(
                                                "Unable to execute query (which appears to be correct) at this time. Please retry.")) {
                                        System.err.println("retry");
                                        isRetry = true;
                                    } else {
                                        e.printStackTrace();
                                    }
                                }
                            } while (isRetry);
                        }
                    }
                }).start();
            }
        }
    }

    private String getInsertTableSql(String tableId, String[] columnNames, String[] columns) {
        StringBuilder sb = new StringBuilder();
        sb.append("INSERT INTO ");
        sb.append(tableId);
        sb.append(" (");

        int count = 0;
        for (String columnName : columnNames) {
            if (count != 0) {
                sb.append(", ");
            }
            sb.append("'" + columnName + "'");
            count++;
        }
        sb.append(") values (");

        count = 0;
        for (String column : columns) {
            if (count != 0) {
                sb.append(", ");
            }
            sb.append("'" + column + "'");
            count++;
        }
        sb.append(")");
        return sb.toString();
    }

    private String getCreateTableSql(String tablename, String[] columns) {
        StringBuilder sb = new StringBuilder();
        sb.append("CREATE TABLE ");
        sb.append(tablename);
        sb.append(" (");

        int count = 0;
        for (String column : columns) {
            if (count != 0) {
                sb.append(", ");
            }
            sb.append("'" + column + "'" + ": STRING");
            count++;
        }
        sb.append(")");

        return sb.toString();
    }

    /**
     * Two versions of ApiExample() are provided:
     * one that accepts a Google user account ID and password for authentication,
     * and one that accepts an existing auth token.
     */

    /**
     * Executes insert, update, and delete statements.
     * Prints out results, if any.
     *
     * This code uses the GDataRequest class and getRequestFactory() method
     * from the Google Data APIs Client Library to construct a POST request.
     * The Google Fusion Tables API-specific part is in the use
     * of the service URL. A Google Fusion Tables API INSERT, UPDATE,
     * or DELETE statement will be passed into this method in the
     * updateQuery parameter.
     */

    public String runUpdate(String updateQuery) throws IOException, ServiceException {
        System.out.println(updateQuery);
        URL url = new URL(SERVICE_URL);
        GDataRequest request = service.getRequestFactory().getRequest(RequestType.INSERT, url,
            new ContentType("application/x-www-form-urlencoded"));
        OutputStreamWriter writer = new OutputStreamWriter(request.getRequestStream());
        writer.append("sql=" + URLEncoder.encode(updateQuery, "UTF-8"));
        writer.flush();

        request.execute();

        String decoded = null;
        Scanner scanner = new Scanner(request.getResponseStream(), "UTF-8");
        while (scanner.hasNextLine()) {
            scanner.findWithinHorizon(CSV_VALUE_PATTERN, 0);
            MatchResult match = scanner.match();
            String quotedString = match.group(2);
            decoded = quotedString == null ? match.group(1) : quotedString.replaceAll("\"\"", "\"");
            System.out.print("|" + decoded);
            if (!match.group(4).equals(",")) {
                System.out.println("|");
            }
        }
        return decoded;
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        if (args.length < 2) {
            System.err
                    .println("Usage:java com.daisukeyamashita.gmaps.fusion.tools.Uploader filename tablename");
            return;
        }
        String filename = args[0];
        String tablename = args[1];
        new Uploader(filename, tablename);
    }

}

Written by daisuke

2月 20th, 2011 at 11:47 am

One Response to 'Google Fusion Tablesの書き込みについて'

Subscribe to comments with RSS or TrackBack to 'Google Fusion Tablesの書き込みについて'.

  1. 500件までのBatch処理はそれなりに早かったです↓。
    http://code.google.com/p/fusion-tables/issues/detail?id=608

    No

    7 2月 12 at 18:56:25

Leave a Reply