webmagic源码分析

前言

在文章《webmagic核心设计和运行机制分析》中已经提到WebMagic内部是通过生产者/消费者模式来实现的,本篇我们就分析一下WebMagic的源代码,先从爬虫入口类main方法开始。

爬虫入口类main方法

public static void main(String[] args) {
    Spider.create(new GithubRepoPageProcessor())
            //从https://github.com/code4craft开始抓    
            .addUrl("https://github.com/code4craft")
            //设置Scheduler,使用Redis来管理URL队列
            .setScheduler(new RedisScheduler("localhost"))
            //设置Pipeline,将结果以json方式保存到文件
            .addPipeline(new JsonFilePipeline("D:\\data\\webmagic"))
            //开启5个线程同时执行
            .thread(5)
            //启动爬虫
            .run();
}

通过官方给出的创建爬虫入口类的样例代码可以看到,启动爬虫是调用Spider.run()方法。

Spider类源码分析

1. Spider.run()方法

checkRunningStat()检查运行状态,不是很重要,跳过。

@Override
public void run() {
    // 检查运行状态
    checkRunningStat();
    // 初始化组件
    initComponent();
    logger.info("Spider {} started!",getUUID());
    // 死循环从Scheduler中拉取Request(Request中封装了url)
    while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
        final Request request = scheduler.poll(this);
        if (request == null) {
            if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
                break;
            }
            // wait until new url added
            // 当Scheduler中不存在Request时,线程等待
            waitNewUrl();
        } else {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 处理Request,核心方法
                        processRequest(request);
                        // 调用监听器onSuccess()方法
                        onSuccess(request);
                    } catch (Exception e) {
                        // 调用监听器onError()方法
                        onError(request);
                        logger.error("process request " + request + " error", e);
                    } finally {
                        pageCount.incrementAndGet();
                        // 唤醒线程
                        signalNewUrl();
                    }
                }
            });
        }
    }
    stat.set(STAT_STOPPED);
    // release some resources
    if (destroyWhenExit) {
        close();
    }
    logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
}

2. initComponent()初始化组件:设置默认Downloader实现,初始化线程池

//初始化组件
protected void initComponent() {
    if (downloader == null) {
        // 默认使用HttpClientDownloader
        this.downloader = new HttpClientDownloader();
    }
    if (pipelines.isEmpty()) {
        pipelines.add(new ConsolePipeline());
    }
    downloader.setThread(threadNum);
    // 初始化线程池
    if (threadPool == null || threadPool.isShutdown()) {
        if (executorService != null && !executorService.isShutdown()) {
            threadPool = new CountableThreadPool(threadNum, executorService);
        } else {
            threadPool = new CountableThreadPool(threadNum);
        }
    }
    if (startRequests != null) {
        for (Request request : startRequests) {
            addRequest(request);
        }
        startRequests.clear();
    }
    startTime = new Date();
}

3. waitNewUrl() / signalNewUrl() :配合Scheduler对象实现生产者/消费者模式

// 线程等待
private void waitNewUrl() {
    newUrlLock.lock();
    try {
        //double check
        if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
            return;
        }
        newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        logger.warn("waitNewUrl - interrupted, error {}", e);
    } finally {
        newUrlLock.unlock();
    }
}

// 线程唤醒
private void signalNewUrl() {
    try {
        newUrlLock.lock();
        newUrlCondition.signalAll();
    } finally {
        newUrlLock.unlock();
    }
}

在Spider类属性中包含了默认Scheduler实现类QueueScheduler的对象scheduler,而在QueueScheduler类中默认使用内存阻塞队列来存储Request。

// Spider类属性
protected Scheduler scheduler = new QueueScheduler();
public class QueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {
    // 默认使用内存阻塞队列来存储Request对象
    private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();

    @Override
    public void pushWhenNoDuplicate(Request request, Task task) {
        queue.add(request);
    }

    @Override
    public Request poll(Task task) {
        return queue.poll();
    }

    @Override
    public int getLeftRequestsCount(Task task) {
        return queue.size();
    }
    
    @Override
    public int getTotalRequestsCount(Task task) {
        return getDuplicateRemover().getTotalRequestsCount(task);
    }
}

看到这里,有人可能会疑惑当阻塞队列中没有Request对象时,线程会卡死在Spider.run()方法中waitNewUrl()上。

其实只要我们看Spider.addUrl()Spider.addRequest()两个方法的源码,就会发现其中调用了线程唤醒方法Spider.signalNewUrl(),而在爬虫入口类中必然会调addUrl()addRequest()其中一个来设置起始url,所以不存在线程运行卡死的情况。

public Spider addUrl(String... urls) {
    for (String url : urls) {
        addRequest(new Request(url));
    }
    // 调用线程唤醒方法
    signalNewUrl();
    return this;
}

public Spider addRequest(Request... requests) {
    for (Request request : requests) {
        addRequest(request);
    }
    // 调用线程唤醒方法
    signalNewUrl();
    return this;
}

private void addRequest(Request request) {
    if (site.getDomain() == null && request != null && request.getUrl() != null) {
            site.setDomain(UrlUtils.getDomain(request.getUrl()));
    }
    // 将request推送到scheduler内存阻塞队列中去
    scheduler.push(request, this);
}

这样就构成了一个典型的生产者/消费者模式代码实现。

4. processRequest():爬虫业务逻辑的核心方法

首先调用Downloader下载网页,再调用自定义的PageProcessor解析网页文本并从中提取出目标数据,最后调用自定义的Pipeline持久化目标数据。

private void processRequest(Request request) {
    // 调用Downloader对象下载网页
    Page page = downloader.download(request, this);
    if (page.isDownloadSuccess()){
        // 下载成功
        onDownloadSuccess(request, page);
    } else {
        onDownloaderFail(request);
    }
}

private void onDownloadSuccess(Request request, Page page) {
    if (site.getAcceptStatCode().contains(page.getStatusCode())){
        // 调用自定义的PageProcessor对象(通过入口类设置)解析封装网页的Page对象
        pageProcessor.process(page);
        // 添加后续需要爬取的url字符串,推送Request到Scheduler中去
        extractAndAddRequests(page, spawnUrl);
        if (!page.getResultItems().isSkip()) {
            for (Pipeline pipeline : pipelines) {
                // 调用自定义的Pipeline对象(通过入口类设置)持久化目标数据,通过ResultItems对象封装传递
                pipeline.process(page.getResultItems(), this);
            }
        }
    } else {
        logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
    }
    sleep(site.getSleepTime());
    return;
}

private void onDownloaderFail(Request request) {
    if (site.getCycleRetryTimes() == 0) {
        sleep(site.getSleepTime());
    } else {
        // for cycle retry
        // 下载失败后重试
        doCycleRetry(request);
    }
}

protected void extractAndAddRequests(Page page, boolean spawnUrl) {
    // 添加target Request
    if (spawnUrl && CollectionUtils.isNotEmpty(page.getTargetRequests())) {
        for (Request request : page.getTargetRequests()) {
            // 推送Request到Scheduler内存阻塞队列中去
            addRequest(request);
        }
    }
}

5. onSuccess() / onError() :监听Request处理成功或失败的情况

只有需要自定义SpiderListener监听器时才会使用到,这里不是重点,不再赘述,具体使用方法可参考源码 https://github.com/xiawq87/sp... 中的实现。

小结

通过上述源码分析的过程,让我们可以大体了解WebMagic内部生产者/消费者模式的实现方式。

相关推荐