I have set up zipline locally on PyCharm. The simulations work, moreover, I have access to premium data from quandl (which automatically updated when I entered my API key). However, now my question is, how do I make a pipeline locally using zipline.
Asked
Active
Viewed 403 times
1 Answers
1
Zipline's documentation is challenging. Zipline.io (as of 2021-0405) is also down. Fortunately, Blueshift has documentation and sample code that shows how to make a pipeline that can be run locally:
- Blueshift sample pipeline code is here. (Pipelines library here.)
- Zipline documentation can be accessed from MLTrading (archive documentation here) since though challenging it is still useful.
- Full code of the pipeline sample code from Blueshift, but modified to run locally through PyCharm, is below the line. Please note as I'm sure you're already aware, the strategy is a bad strategy and you shouldn't trade on it. It does show local instantiations of pipelines though.
"""
Title: Classic (Pedersen) time-series momentum (equal weights)
Description: This strategy uses past returns and go long (short)
the positive (negative) n-percentile
Style tags: Momentum
Asset class: Equities, Futures, ETFs, Currencies
Dataset: All
"""
"""
Sources:
Overall Algorithm here:
https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/factors/time_series_momentum.py
Custom (Ave Vol Filter, Period Returns) Functions Here:
https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/library/pipelines/pipelines.py
"""
import numpy as np
from zipline.pipeline import CustomFilter, CustomFactor, Pipeline
from zipline.pipeline.data import EquityPricing
from zipline.api import (
order_target_percent,
schedule_function,
date_rules,
time_rules,
attach_pipeline,
pipeline_output,
)
def average_volume_filter(lookback, amount):
"""
Returns a custom filter object for volume-based filtering.
Args:
lookback (int): lookback window size
amount (int): amount to filter (high-pass)
Returns:
A custom filter object
Examples::
# from library.pipelines.pipelines import average_volume_filter
pipe = Pipeline()
volume_filter = average_volume_filter(200, 1000000)
pipe.set_screen(volume_filter)
"""
class AvgDailyDollarVolumeTraded(CustomFilter):
inputs = [EquityPricing.close, EquityPricing.volume]
def compute(self, today, assets, out, close_price, volume):
dollar_volume = np.mean(close_price * volume, axis=0)
high_volume = dollar_volume > amount
out[:] = high_volume
return AvgDailyDollarVolumeTraded(window_length=lookback)
def period_returns(lookback):
"""
Returns a custom factor object for computing simple returns over
period.
Args:
lookback (int): lookback window size
Returns:
A custom factor object.
Examples::
# from library.pipelines.pipelines import period_returns
pipe = Pipeline()
momentum = period_returns(200)
pipe.add(momentum,'momentum')
"""
class SignalPeriodReturns(CustomFactor):
inputs = [EquityPricing.close]
def compute(self, today, assets, out, close_price):
start_price = close_price[0]
end_price = close_price[-1]
returns = end_price / start_price - 1
out[:] = returns
return SignalPeriodReturns(window_length=lookback)
def initialize(context):
'''
A function to define things to do at the start of the strategy
'''
# The context variables can be accessed by other methods
context.params = {'lookback': 12,
'percentile': 0.1,
'min_volume': 1E7
}
# Call rebalance function on the first trading day of each month
schedule_function(strategy, date_rules.month_start(),
time_rules.market_close(minutes=1))
# Set up the pipe-lines for strategies
attach_pipeline(make_strategy_pipeline(context),
name='strategy_pipeline')
def strategy(context, data):
generate_signals(context, data)
rebalance(context, data)
def make_strategy_pipeline(context):
pipe = Pipeline()
# get the strategy parameters
lookback = context.params['lookback'] * 21
v = context.params['min_volume']
# Set the volume filter
volume_filter = average_volume_filter(lookback, v)
# compute past returns
momentum = period_returns(lookback)
pipe.add(momentum, 'momentum')
pipe.set_screen(volume_filter)
return pipe
def generate_signals(context, data):
try:
pipeline_results = pipeline_output('strategy_pipeline')
except:
context.long_securities = []
context.short_securities = []
return
p = context.params['percentile']
momentum = pipeline_results
long_candidates = momentum[momentum > 0].dropna().sort_values('momentum')
short_candidates = momentum[momentum < 0].dropna().sort_values('momentum')
n_long = len(long_candidates)
n_short = len(short_candidates)
n = int(min(n_long, n_short) * p)
if n == 0:
print("{}, no signals".format(data.current_dt))
context.long_securities = []
context.short_securities = []
context.long_securities = long_candidates.index[-n:]
context.short_securities = short_candidates.index[:n]
def rebalance(context, data):
# weighing function
n = len(context.long_securities)
if n < 1:
return
weight = 0.5 / n
# square off old positions if any
for security in context.portfolio.positions:
if security not in context.long_securities and \
security not in context.short_securities:
order_target_percent(security, 0)
# Place orders for the new portfolio
for security in context.long_securities:
order_target_percent(security, weight)
for security in context.short_securities:
order_target_percent(security, -weight)

BLimitless
- 2,060
- 5
- 17
- 32