Snippets
A little help from my friend
here you find tips for your neo, pimping it to make testing easier.
Put your host ip into /etc/hosts
SSH does reverse lookups of IPs that try to login. In order to speed up that process, enhance your /etc/hosts with a line as
192.168.0.200 hostpc
192.168.0.202 gta02
For more about networking with neos seeย networked
Pimp your ssh I
If you think empty password entry is annoying, put your pub-key on the neo. If you don’t have one, just create a new priv/pub key pair using
ssh-keygen -t rsa
For now, just press Enter to answer any questions ssh-keygen asks you. Then put the file .ssh/id_rsa.pub on the neo – as .ssh/authorized_keys:
ssh [email protected] 'mkdir .ssh'
cat ~/.ssh/*.pub|ssh [email protected] 'cat >.ssh/authorized_keys'
Give your empty password twice for the commands above. That should be the last two times you had to give the root password.
Pimp your ssh II
Current pyneo images put your ssh keys on sd. After flashing hostkeys will be recovered from sd and client keys are just symllinked there or the users homedir is the mountpoint of the sd.
Monitor D-Bus
D-Bus has lots of tools for debugging. For watching signals sent, just do a
dbus-monitor --system
Use your host for syslog
In the file /etc/syslog.conf have only the following line:
*.* @hostpc
and start a udp receiver on your host (to allow to do so as a normal user I changed the port to 1514). A simple py script does that:
#!/usr/bin/env python
from socket import socket, AF_INET, SOCK_DGRAM
s = socket(AF_INET, SOCK_DGRAM)
s.bind(('192.168.0.200', 514))
while True:
line = s.recv(2048)
print line
You may use that with egrep to filter things you are interested in:
syslogl.py|egrep '(gsmd|pppd|chat)'
Use screen!
Install the screen package and use it. It makes life easy. If you use screen as a terminal emulator talking to your modem or gps-chip, don’t forget to enable HW flow control. Please read the docs about it if you haven’t used it before as this will let you make use of it to the fullest.
Setting day/time
The neo often didn’t have the correct time on startup so I used several python lines to set it:
Retrieve the time
#!/usr/bin/env python
import socket
import struct
from datetime import datetime, timedelta
def rdate(host="time.fu-berlin.de", port=37, ):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
t = s.recv(4)
s.close()
t = struct.unpack("!I", t)[0]
return datetime.fromtimestamp(t - 2208988800L) # 1970-01-01 00:00:00
Set the time
#!/usr/bin/env python
from ctypes import cdll, c_int, POINTER, c_void_p, Structure
def settimeofday(sec_since_epoc):
libc = cdll['libc.so.6']
libc.settimeofday.restype = c_int
class timeval(Structure):
_fields_ = [('tv_sec', c_int), ('tv_usec', c_int)]
libc.settimeofday.argtypes = POINTER(timeval), c_void_p
t = timeval()
t.tv_sec = sec_since_epoc
libc.settimeofday(t, None)
Daemons with glib
It took me a long time to find a way to implement a unix philosophy-style daemon the right way with python. I think with glib this is possible. I avoid threads. I want to use the good old select()-call. And I need dbus. All this leads to glib. And glib makes life easy:
#!/usr/bin/env python
from gobject import timeout_add
from dbus import SystemBus
from dbus.service import Object as NotifyObject, signal as notify, method, BusName
class Power(NotifyObject):
def __init__(self):
NotifyObject.__init__(self,
object_path='/de/neo1973germany/Sample',
bus_name=BusName('de.neo1973germany', InitBus()),
)
timeout_add(9 * 1000, self.__poll)
@method(DBUS_INTERFACE, '', 'a{sv}')
def GetStatus(self):
return self.last_status
@notify(DBUS_INTERFACE, 'a{sv}')
def Status(self, newmap):
self.last_status = newmap
def __poll(self):
try:
newmap = dict(self.last_status,
# set new fields here
)
if self.last_status != newmap:
self.Status(newmap)
Other methods allow wakeup on all important events in a unix system:
gobject.io_add_watch(fd, gobject.IO_IN, self.__io)
gobject.child_watch_add(pid, self.__child)
Remember that we don’t have threads in this model so be quick in your callback method. Don’t do any long calculating or parsing or whatever in those methods. Fire & return! If you want parse/calculate stuff you can ask glib when it’s idle.
Be asynchronous!!!
Use async_callbacks if needed – e.g. if you don’t want to block when having to answer, do a request later.
def got_data(self, data):
self.cb_d(data)
@method(DIN_PHONE, '', 'a{sv}', async_callbacks=('cb_d', 'cb_e', )
def DeviceGetInfo(self, cb_d, cb_e):
self.cb_d = cb_d
sels.cb_e = cb_e
Be async as a client also
Being async on the client side is nearly as important as on the server side – remember how annoying hanging GUIs are! So really always do calls as follows:
def DeviceInfo(response):
pass
some_interface.DeviceGetInfo(reply_handler=DeviceInfo, error_handler=log, )
Getting started with e* fast
I use python for functionality and edje for screen design. This is a nice fit with the e-bindings. This is the smallest program using e&py on the neo (fix neo screen size and full screen):
#!/usr/bin/env python
import ecore
import ecore.evas
ee = ecore.evas.SoftwareX11_16(w=480, h=640)
ee.fullscreen = 1
ee.show()
ecore.main_loop_begin()
If you want, just start
DISPLAY=:0 python
on your device and enter the lines above. Be sure X is running. If you encounter any problems you’ll see the following message:
*** ECORE ERROR: Ecore Magic Check Failed!!! *** IN FUNCTION: ecore_evas_data_set() Input handle pointer is NULL! *** NAUGHTY PROGRAMMER!!! *** SPANK SPANK SPANK!!! *** Now go fix your code. Tut tut tut!
e lacks a error handling or reporting system completely. If you run into an error you are lost and get verbally harassed instead. I’m sorry about that, just try to ignore the lines. I try to do so also. If you encounter that message, look at the following:
- Did you set DISPLAY?
- Have you installed the engines, loaders and savers you try to use?
This is a snipped that puts something on the screen bases on evas:
#!/usr/bin/env python
import ecore
import ecore.evas
import evas
class Area(evas.ClippedSmartObject):
def __init__(self, canvas):
evas.ClippedSmartObject.__init__(self, canvas)
def resize(self, w, h):
self.bg.resize(w, h)
def delete(self):
evas.ClippedSmartObject.delete(self)
ee = ecore.evas.SoftwareX11_16(w=480, h=640)
ee.fullscreen = 1
ee.name_class = ('pyneo', 'zad')
ee.title = 'zad'
obj = Area(ee.evas)
obj.bg = obj.Rectangle(color=(244, 153, 0, 255))
obj.bg.show()
obj.size = 300, 300,
obj.center = ee.evas.rect.center
obj.show()
ee.show()
ecore.main_loop_begin()
Evas is very low level. Edje allows screen design in a higher level – the design is described in a file with an own syntax.
To start with edje you need a compiler on your host PC. Edje files (extension .edc) are compiled to binary files (extension .edj – don’t ask me why these extension imply the opposite) which contain images, fonts and layouts of your GUI:
collections {
images {
image: "logo.png" COMP;
group {
name: "background";
min: 480 640;
parts {
part {
name: "background_image";
type: IMAGE;
description {
rel1 {
relative: 0.0 0.0;
}
rel2 {
relative: 1.0 1.0;
}
image {
normal: "logo.png";
}
}
}
}
}
}
If you don’t want to use a complete e installation but have compiled e with bitbake/mokomakefile you may use the edje compiler for i686 that was compiled already. Just set the path to find the shared libs and start the compiler with a path like:
EPATH=$OM/tmp/staging/i686-linux/usr
LD_LIBRARY_PATH=$EPATH/lib \
$EPATH/bin/edje_cc \
-fd /usr/share/fonts/truetype/ttf-bitstream-vera \
-id . \
blaha.edc \
blaha.edj
Edje binary files are archives containing the compiles text plus all references images and fonts. In you programm you allways reference elements (names parts) of the edje with filename plus partname:
part = edje.Edje(ee.evas, file='/path/to/gui.edj', group='foreground')
playing sound
#!/usr/bin/env python
import pygst
pygst.require('0.10')
import gst
import gobject, sys
def play_uri(location):
" play an file like: foobar.mp3 "
def on_new_decoded_pad(element, pad, last):
caps = pad.get_caps()
name = caps[0].get_name()
if name == 'audio/x-raw-float' or name == 'audio/x-raw-int':
if not apad.is_linked():
pad.link(apad)
def on_eos(bus, msg):
print 'on_eos'
pipeline.set_state(gst.STATE_NULL)
mainloop.quit()
def on_tag(bus, msg):
taglist = msg.parse_tag()
print 'on_tag:'
for key in taglist.keys():
print '\t%s = %s' % (key, taglist[key])
def on_error(bus, msg):
error = msg.parse_error()
print 'on_error:', error[1]
mainloop.quit()
# the pipeline
pipeline = gst.Pipeline()
# create bus and connect several handlers
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message::eos', on_eos)
bus.connect('message::tag', on_tag)
bus.connect('message::error', on_error)
# create elements
src = gst.element_factory_make('filesrc')
dec = gst.element_factory_make('decodebin')
conv = gst.element_factory_make('audioconvert')
rsmpl = gst.element_factory_make('audioresample')
sink = gst.element_factory_make('alsasink')
print 'Playing:', location
src.set_property('location', location)
dec.connect('new-decoded-pad', on_new_decoded_pad)
pipeline.add(src, dec, conv, rsmpl, sink)
src.link(dec)
gst.element_link_many(conv, rsmpl, sink)
apad = conv.get_pad('sink')
mainloop = gobject.MainLoop()
pipeline.set_state(gst.STATE_PLAYING)
mainloop.run()
if __name__ == '__main__':
i = 0
while i < 3:
play_uri('ringtone_simple02.mp3')
i += 1
Python & sqlite
#!/usr/bin/env python
from sqlite3 import connect
connection = connect('pim.db')
cursor = connection.cursor()
cursor.execute('SELECT firstname, lastname, mobil FROM contacts WHERE mobil LIKE ?', ('%%%s%%'% number, ))
for row in cursor:
print 'firstname:', row[0]
print 'lastname:', row[1]
print 'mobil:', row[2]
as josch says: readdir rulez
#!/usr/bin/env python
# -*- coding: utf8 -*-
import ctypes
libc = ctypes.CDLL("libc.so.6")
print dir(ctypes)
class Entry(ctypes.Structure):
_fields_ = (
("ino", ctypes.c_int32),
("off", ctypes.c_int32),
("reclen", ctypes.c_int16),
("type", ctypes.c_char),
("name", ctypes.ARRAY(ctypes.c_char, 256)),
)
class Stream(ctypes.Structure):
_fields_ = (
("fd", ctypes.c_int32),
("data", ctypes.c_char_p),
("allocation", ctypes.c_uint32),
("offset", ctypes.c_uint32),
("size", ctypes.c_uint32),
("dirent", Entry),
)
libc.opendir.argtypes = ctypes.c_char_p,
libc.opendir.restype = ctypes.POINTER(Stream)
libc.readdir.argtypes = ctypes.POINTER(Stream),
libc.readdir.restype = ctypes.POINTER(Entry)
libc.rewinddir.argtypes = ctypes.POINTER(Stream),
#libc.rewinddir.restype = ctypes.c_void
libc.telldir.argtypes = ctypes.POINTER(Stream),
libc.telldir.restype = ctypes.c_long
libc.seekdir.argtypes = ctypes.POINTER(Stream), ctypes.c_long,
#libc.seekdir.restype = ctypes.c_void
libc.closedir.argtypes = ctypes.POINTER(Stream),
libc.closedir.restype = ctypes.c_int
class Dir(object):
def __init__(self, path):
self.__stream = libc.opendir(path)
self.pos = 0
self.max = None
self.entry = libc.readdir(self.__stream)
def __getslice__(self, i, j):
libc.rewinddir(self.__stream)
self.pos = 0
while self.entry and self.pos < i and self.pos < j:
self.entry = libc.readdir(self.__stream)
self.pos += 1
self.entry = libc.readdir(self.__stream)
self.max = j
return self
def __iter__(self):
while self.entry and (self.max is None or self.pos < self.max):
yield self.entry.contents
self.entry = libc.readdir(self.__stream)
self.pos += 1
#def next(self):
#raise StopIteration()
def __del__(self):
libc.closedir(self.__stream)
self.__stream = None
for n in Dir("/var/lib/dpkg/info/")[0:10]:
print n.name